You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/05/11 18:36:22 UTC

svn commit: r773621 - in /hadoop/core/trunk: ./ src/contrib/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/pipes/

Author: yhemanth
Date: Mon May 11 16:36:22 2009
New Revision: 773621

URL: http://svn.apache.org/viewvc?rev=773621&view=rev
Log:
HADOOP-5771. Implements unit tests for LinuxTaskController. Contributed by Sreekanth Ramakrishnan and Vinod Kumar Vavilapalli.

Added:
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipesAsDifferentUser.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/build.xml
    hadoop/core/trunk/src/contrib/build-contrib.xml
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=773621&r1=773620&r2=773621&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 11 16:36:22 2009
@@ -334,6 +334,9 @@
     run-test-mapred, run-test-hdfs and run-test-hdfs-with-mr.
     (Sharad Agarwal via ddas)
 
+    HADOOP-5771. Implements unit tests for LinuxTaskController.
+    (Sreekanth Ramakrishnan and Vinod Kumar Vavilapalli via yhemanth)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/build.xml?rev=773621&r1=773620&r2=773621&view=diff
==============================================================================
--- hadoop/core/trunk/build.xml (original)
+++ hadoop/core/trunk/build.xml Mon May 11 16:36:22 2009
@@ -949,6 +949,8 @@
       <sysproperty key="test.debug.data" value="${test.debug.data}"/>
       <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
       <sysproperty key="test.src.dir" value="${test.src.dir}"/>
+    	<sysproperty key="taskcontroller-path" value="${taskcontroller-path}"/>
+    	<sysproperty key="taskcontroller-user" value="${taskcontroller-user}"/>
       <sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />
       <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml"/>
       <sysproperty key="java.library.path"

Modified: hadoop/core/trunk/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/build-contrib.xml?rev=773621&r1=773620&r2=773621&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/build-contrib.xml (original)
+++ hadoop/core/trunk/src/contrib/build-contrib.xml Mon May 11 16:36:22 2009
@@ -235,6 +235,8 @@
       <sysproperty key="fs.default.name" value="${fs.default.name}"/>
       <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
       <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/> 
+      <sysproperty key="taskcontroller-path" value="${taskcontroller-path}"/>
+      <sysproperty key="taskcontroller-user" value="${taskcontroller-user}"/>
       <classpath refid="test.classpath"/>
       <formatter type="${test.junit.output.format}" />
       <batchtest todir="${build.test}" unless="testcase">

Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java?rev=773621&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java Mon May 11 16:36:22 2009
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Test Streaming with LinuxTaskController running the jobs as a user different
+ * from the user running the cluster. See {@link ClusterWithLinuxTaskController}
+ */
+public class TestStreamingAsDifferentUser extends
+    ClusterWithLinuxTaskController {
+
+  private Path inputPath = new Path("input");
+  private Path outputPath = new Path("output");
+  private String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
+  private String map =
+      StreamUtil.makeJavaCommand(TrApp.class, new String[] { ".", "\\n" });
+  private String reduce =
+      StreamUtil.makeJavaCommand(UniqApp.class, new String[] { "R" });
+
+  public void testStreaming()
+      throws Exception {
+    if (!shouldRun()) {
+      return;
+    }
+    startCluster();
+    JobConf myConf = getClusterConf();
+    FileSystem inFs = inputPath.getFileSystem(myConf);
+    FileSystem outFs = outputPath.getFileSystem(myConf);
+    outFs.delete(outputPath, true);
+    if (!inFs.mkdirs(inputPath)) {
+      throw new IOException("Mkdirs failed to create " + inFs.toString());
+    }
+    DataOutputStream file = inFs.create(new Path(inputPath, "part-0"));
+    file.writeBytes(input);
+    file.close();
+    String[] args =
+        new String[] { "-input", inputPath.makeQualified(inFs).toString(),
+            "-output", outputPath.makeQualified(outFs).toString(), "-mapper",
+            map, "-reducer", reduce, "-jobconf",
+            "keep.failed.task.files=true", "-jobconf",
+            "stream.tmpdir=" + System.getProperty("test.build.data", "/tmp") };
+    StreamJob streamJob = new StreamJob(args, true);
+    streamJob.setConf(myConf);
+    streamJob.go();
+    assertOwnerShip(outputPath);
+  }
+}

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?rev=773621&r1=773620&r2=773621&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Mon May 11 16:36:22 2009
@@ -41,8 +41,6 @@
     // test that some JobConf properties are exposed as expected     
     // Note the dots translated to underscore: 
     // property names have been escaped in PipeMapRed.safeEnvVarName()
-    expect("mapred_job_tracker", "local");
-    //expect("mapred_local_dir", "build/test/mapred/local");
     expectDefined("mapred_local_dir");
     expect("mapred_output_format_class", "org.apache.hadoop.mapred.TextOutputFormat");
     expect("mapred_output_key_class", "org.apache.hadoop.io.Text");
@@ -52,7 +50,6 @@
     expectDefined("mapred_task_id");
 
     expectDefined("map_input_file");
-    expect("map_input_start", "0");
     expectDefined("map_input_length");
 
     expectDefined("io_sort_factor");

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=773621&r1=773620&r2=773621&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Mon May 11 16:36:22 2009
@@ -289,7 +289,7 @@
                                           List<String> cmdArgs, JvmEnv env) 
                                     throws IOException {
     String[] taskControllerCmd = new String[3 + cmdArgs.size()];
-    taskControllerCmd[0] = taskControllerExe;
+    taskControllerCmd[0] = getTaskControllerExecutablePath();
     taskControllerCmd[1] = userName;
     taskControllerCmd[2] = String.valueOf(command.ordinal());
     int i = 3;
@@ -419,6 +419,9 @@
           jobDirectory.getPath(), FILE_PERMISSIONS, true);
     }
   }
-  
+
+  protected String getTaskControllerExecutablePath() {
+    return taskControllerExe;
+  }  
 }
 

Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=773621&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (added)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Mon May 11 16:36:22 2009
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import junit.framework.TestCase;
+
+/**
+ * The base class which starts up a cluster with LinuxTaskController as the task
+ * controller.
+ * 
+ * In order to run test cases utilizing LinuxTaskController please follow the
+ * following steps:
+ * <ol>
+ * <li>Build LinuxTaskController by not passing any
+ * <code>-Dhadoop.conf.dir</code></li>
+ * <li>Make the built binary to setuid executable</li>
+ * <li>Execute following targets:
+ * <code>ant test -Dcompile.c++=true -Dtaskcontroller-path=<em>path to built binary</em> 
+ * -Dtaskcontroller-user=<em>user,group</em></code></li>
+ * </ol>
+ * 
+ */
+public class ClusterWithLinuxTaskController extends TestCase {
+  private static final Log LOG =
+      LogFactory.getLog(ClusterWithLinuxTaskController.class);
+
+  /**
+   * The wrapper class around LinuxTaskController which allows modification of
+   * the custom path to task-controller which we can use for task management.
+   * 
+   **/
+  public static class MyLinuxTaskController extends LinuxTaskController {
+    String taskControllerExePath;
+
+    @Override
+    protected String getTaskControllerExecutablePath() {
+      return taskControllerExePath;
+    }
+
+    void setTaskControllerExe(String execPath) {
+      this.taskControllerExePath = execPath;
+    }
+  }
+
+  // cluster instances which sub classes can use
+  protected MiniMRCluster mrCluster = null;
+  protected MiniDFSCluster dfsCluster = null;
+
+  private JobConf clusterConf = null;
+  protected Path homeDirectory;
+
+  private static final int NUMBER_OF_NODES = 1;
+
+  private File configurationFile = null;
+
+  private UserGroupInformation taskControllerUser;
+
+  /*
+   * Utility method which subclasses use to start and configure the MR Cluster
+   * so they can directly submit a job.
+   */
+  protected void startCluster()
+      throws IOException {
+    JobConf conf = new JobConf();
+    dfsCluster = new MiniDFSCluster(conf, NUMBER_OF_NODES, true, null);
+    conf.set("mapred.task.tracker.task-controller",
+        MyLinuxTaskController.class.getName());
+    mrCluster =
+        new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
+            .toString(), 1, null, null, conf);
+
+    // Get the configured taskcontroller-path
+    String path = System.getProperty("taskcontroller-path");
+    createTaskControllerConf(path);
+    String execPath = path + "/task-controller";
+    TaskTracker tracker = mrCluster.getTaskTrackerRunner(0).tt;
+    // TypeCasting the parent to our TaskController instance as we
+    // know that that would be instance which should be present in TT.
+    ((MyLinuxTaskController) tracker.getTaskController())
+        .setTaskControllerExe(execPath);
+    String ugi = System.getProperty("taskcontroller-user");
+    clusterConf = mrCluster.createJobConf();
+    String[] splits = ugi.split(",");
+    taskControllerUser = new UnixUserGroupInformation(splits);
+    clusterConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
+    createHomeDirectory(clusterConf);
+  }
+
+  private void createHomeDirectory(JobConf conf)
+      throws IOException {
+    FileSystem fs = dfsCluster.getFileSystem();
+    String path = "/user/" + taskControllerUser.getUserName();
+    homeDirectory = new Path(path);
+    LOG.info("Creating Home directory : " + homeDirectory);
+    fs.mkdirs(homeDirectory);
+    changePermission(conf, homeDirectory);
+  }
+
+  private void changePermission(JobConf conf, Path p)
+      throws IOException {
+    FileSystem fs = dfsCluster.getFileSystem();
+    fs.setOwner(homeDirectory, taskControllerUser.getUserName(),
+        taskControllerUser.getGroupNames()[0]);
+  }
+
+  private void createTaskControllerConf(String path)
+      throws IOException {
+    File confDirectory = new File(path, "../conf");
+    if (!confDirectory.exists()) {
+      confDirectory.mkdirs();
+    }
+    configurationFile = new File(confDirectory, "taskcontroller.cfg");
+    PrintWriter writer =
+        new PrintWriter(new FileOutputStream(configurationFile));
+
+    writer.println(String.format("mapred.local.dir=%s", mrCluster
+        .getTaskTrackerLocalDir(0)));
+
+    writer.flush();
+    writer.close();
+  }
+
+  /**
+   * Can we run the tests with LinuxTaskController?
+   * 
+   * @return boolean
+   */
+  protected boolean shouldRun() {
+    return isTaskExecPathPassed() && isUserPassed();
+  }
+
+  private boolean isTaskExecPathPassed() {
+    String path = System.getProperty("taskcontroller-path");
+    if (path == null || path.isEmpty()
+        || path.equals("${taskcontroller-path}")) {
+      return false;
+    }
+    return true;
+  }
+
+  private boolean isUserPassed() {
+    String ugi = System.getProperty("taskcontroller-user");
+    if (ugi != null && !(ugi.equals("${taskcontroller-user}"))
+        && !ugi.isEmpty()) {
+      if (ugi.indexOf(",") > 1) {
+        return true;
+      }
+      return false;
+    }
+    return false;
+  }
+
+  protected JobConf getClusterConf() {
+    return new JobConf(clusterConf);
+  }
+
+  @Override
+  protected void tearDown()
+      throws Exception {
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+    }
+
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+
+    if (configurationFile != null) {
+      configurationFile.delete();
+    }
+
+    super.tearDown();
+  }
+
+  /**
+   * Assert that the job is actually run by the specified user by verifying the
+   * permissions of the output part-files.
+   * 
+   * @param outDir
+   * @throws IOException
+   */
+  protected void assertOwnerShip(Path outDir)
+      throws IOException {
+    FileSystem fs = outDir.getFileSystem(clusterConf);
+    assertOwnerShip(outDir, fs);
+  }
+
+  /**
+   * Assert that the job is actually run by the specified user by verifying the
+   * permissions of the output part-files.
+   * 
+   * @param outDir
+   * @param fs
+   * @throws IOException
+   */
+  protected void assertOwnerShip(Path outDir, FileSystem fs)
+      throws IOException {
+    for (FileStatus status : fs.listStatus(outDir, new OutputLogFilter())) {
+      String owner = status.getOwner();
+      String group = status.getGroup();
+      LOG.info("Ownership of the file is " + status.getPath() + " is " + owner
+          + "," + group);
+      assertTrue("Output part-file's owner is not correct. Expected : "
+          + taskControllerUser.getUserName() + " Found : " + owner, owner
+          .equals(taskControllerUser.getUserName()));
+      assertTrue("Output part-file's group is not correct. Expected : "
+          + taskControllerUser.getGroupNames()[0] + " Found : " + group, group
+          .equals(taskControllerUser.getGroupNames()[0]));
+    }
+  }
+}

Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=773621&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (added)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Mon May 11 16:36:22 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Test a java-based mapred job with LinuxTaskController running the jobs as a
+ * user different from the user running the cluster. See
+ * {@link ClusterWithLinuxTaskController}
+ */
+public class TestJobExecutionAsDifferentUser extends
+    ClusterWithLinuxTaskController {
+
+  public void testJobExecution()
+      throws Exception {
+    if (!shouldRun()) {
+      return;
+    }
+    startCluster();
+    Path inDir = new Path("input");
+    Path outDir = new Path("output");
+    RunningJob job =
+        UtilsForTests.runJobSucceed(getClusterConf(), inDir, outDir);
+    assertTrue("Job failed", job.isSuccessful());
+    assertOwnerShip(outDir);
+  }
+}

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=773621&r1=773620&r2=773621&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipes.java Mon May 11 16:36:22 2009
@@ -30,6 +30,8 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -46,7 +48,18 @@
 public class TestPipes extends TestCase {
   private static final Log LOG =
     LogFactory.getLog(TestPipes.class.getName());
-
+  
+  private static Path cppExamples = 
+    new Path(System.getProperty("install.c++.examples"));
+  static Path wordCountSimple = 
+    new Path(cppExamples, "bin/wordcount-simple");
+  static Path wordCountPart = 
+    new Path(cppExamples, "bin/wordcount-part");
+  static Path wordCountNoPipes = 
+    new Path(cppExamples,"bin/wordcount-nopipe");
+  
+  static Path nonPipedOutDir;
+  
   static void cleanup(FileSystem fs, Path p) throws IOException {
     fs.delete(p, true);
     assertFalse("output not cleaned up", fs.exists(p));
@@ -59,7 +72,6 @@
     }
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
-    Path cppExamples = new Path(System.getProperty("install.c++.examples"));
     Path inputPath = new Path("/testing/in");
     Path outputPath = new Path("/testing/out");
     try {
@@ -68,17 +80,15 @@
       dfs = new MiniDFSCluster(conf, numSlaves, true, null);
       mr = new MiniMRCluster(numSlaves, dfs.getFileSystem().getUri().toString(), 1);
       writeInputFile(dfs.getFileSystem(), inputPath);
-      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"), 
-                 inputPath, outputPath, 3, 2, twoSplitOutput);
+      runProgram(mr, dfs, wordCountSimple, 
+                 inputPath, outputPath, 3, 2, twoSplitOutput, null);
       cleanup(dfs.getFileSystem(), outputPath);
-
-      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"), 
-                 inputPath, outputPath, 3, 0, noSortOutput);
+      runProgram(mr, dfs, wordCountSimple, 
+                 inputPath, outputPath, 3, 0, noSortOutput, null);
       cleanup(dfs.getFileSystem(), outputPath);
-
-      runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-part"),
-                 inputPath, outputPath, 3, 2, fixedPartitionOutput);
-      runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
+      runProgram(mr, dfs, wordCountPart,
+                 inputPath, outputPath, 3, 2, fixedPartitionOutput, null);
+      runNonPipedProgram(mr, dfs, wordCountNoPipes, null);
       mr.waitUntilIdle();
     } finally {
       mr.shutdown();
@@ -86,6 +96,7 @@
     }
   }
 
+
   final static String[] twoSplitOutput = new String[] {
     "`and\t1\na\t1\nand\t1\nbeginning\t1\nbook\t1\nbut\t1\nby\t1\n" +
     "conversation?'\t1\ndo:\t1\nhad\t2\nhaving\t1\nher\t2\nin\t1\nit\t1\n"+
@@ -123,7 +134,7 @@
     ""                                                   
   };
   
-  private void writeInputFile(FileSystem fs, Path dir) throws IOException {
+  static void writeInputFile(FileSystem fs, Path dir) throws IOException {
     DataOutputStream out = fs.create(new Path(dir, "part0"));
     out.writeBytes("Alice was beginning to get very tired of sitting by her\n");
     out.writeBytes("sister on the bank, and of having nothing to do: once\n");
@@ -134,12 +145,18 @@
     out.close();
   }
 
-  private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs, 
+  static void runProgram(MiniMRCluster mr, MiniDFSCluster dfs, 
                           Path program, Path inputPath, Path outputPath,
-                          int numMaps, int numReduces, String[] expectedResults
+                          int numMaps, int numReduces, String[] expectedResults,
+                          JobConf conf
                          ) throws IOException {
     Path wordExec = new Path("/testing/bin/application");
-    JobConf job = mr.createJobConf();
+    JobConf job = null;
+    if(conf == null) {
+      job = mr.createJobConf();
+    }else {
+      job = new JobConf(conf);
+    } 
     job.setNumMapTasks(numMaps);
     job.setNumReduceTasks(numReduces);
     {
@@ -198,15 +215,21 @@
    * @param program the program to run
    * @throws IOException
    */
-  private void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
-                                  Path program) throws IOException {
-    JobConf job = mr.createJobConf();
+  static void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
+                                  Path program, JobConf conf) throws IOException {
+    JobConf job;
+    if(conf == null) {
+      job = mr.createJobConf();
+    }else {
+      job = new JobConf(conf);
+    }
+    
     job.setInputFormat(WordCountInputFormat.class);
     FileSystem local = FileSystem.getLocal(job);
     Path testDir = new Path("file:" + System.getProperty("test.build.data"), 
                             "pipes");
     Path inDir = new Path(testDir, "input");
-    Path outDir = new Path(testDir, "output");
+    nonPipedOutDir = new Path(testDir, "output");
     Path wordExec = new Path("/testing/bin/application");
     Path jobXml = new Path(testDir, "job.xml");
     {
@@ -228,19 +251,20 @@
     out.writeBytes("all dogs bow wow\n");
     out.writeBytes("hello drink java\n");
     out.close();
-    local.delete(outDir, true);
-    local.mkdirs(outDir);
+    local.delete(nonPipedOutDir, true);
+    local.mkdirs(nonPipedOutDir, new FsPermission(FsAction.ALL, FsAction.ALL,
+        FsAction.ALL));
     out = local.create(jobXml);
     job.writeXml(out);
     out.close();
     System.err.println("About to run: Submitter -conf " + jobXml + 
-                       " -input " + inDir + " -output " + outDir + 
+                       " -input " + inDir + " -output " + nonPipedOutDir + 
                        " -program " + 
                        dfs.getFileSystem().makeQualified(wordExec));
     try {
       Submitter.main(new String[]{"-conf", jobXml.toString(),
                                   "-input", inDir.toString(),
-                                  "-output", outDir.toString(),
+                                  "-output", nonPipedOutDir.toString(),
                                   "-program", 
                         dfs.getFileSystem().makeQualified(wordExec).toString(),
                                   "-reduces", "2"});

Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipesAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipesAsDifferentUser.java?rev=773621&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipesAsDifferentUser.java (added)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/pipes/TestPipesAsDifferentUser.java Mon May 11 16:36:22 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Test Pipes jobs with LinuxTaskController running the jobs as a user different
+ * from the user running the cluster. See {@link ClusterWithLinuxTaskController}
+ */
+public class TestPipesAsDifferentUser extends ClusterWithLinuxTaskController {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestPipesAsDifferentUser.class);
+
+  public void testPipes()
+      throws Exception {
+    if (System.getProperty("compile.c++") == null) {
+      LOG.info("compile.c++ is not defined, so skipping TestPipes");
+      return;
+    }
+
+    if (!shouldRun()) {
+      return;
+    }
+
+    super.startCluster();
+    JobConf clusterConf = getClusterConf();
+    Path inputPath = new Path(homeDirectory, "in");
+    Path outputPath = new Path(homeDirectory, "out");
+
+    TestPipes.writeInputFile(FileSystem.get(clusterConf), inputPath);
+    TestPipes.runProgram(mrCluster, dfsCluster, TestPipes.wordCountSimple,
+        inputPath, outputPath, 3, 2, TestPipes.twoSplitOutput, clusterConf);
+    assertOwnerShip(outputPath);
+    TestPipes.cleanup(dfsCluster.getFileSystem(), outputPath);
+
+    TestPipes.runProgram(mrCluster, dfsCluster, TestPipes.wordCountSimple,
+        inputPath, outputPath, 3, 0, TestPipes.noSortOutput, clusterConf);
+    assertOwnerShip(outputPath);
+    TestPipes.cleanup(dfsCluster.getFileSystem(), outputPath);
+
+    TestPipes.runProgram(mrCluster, dfsCluster, TestPipes.wordCountPart,
+        inputPath, outputPath, 3, 2, TestPipes.fixedPartitionOutput,
+        clusterConf);
+    assertOwnerShip(outputPath);
+    TestPipes.cleanup(dfsCluster.getFileSystem(), outputPath);
+
+    TestPipes.runNonPipedProgram(mrCluster, dfsCluster,
+        TestPipes.wordCountNoPipes, clusterConf);
+    assertOwnerShip(TestPipes.nonPipedOutDir, FileSystem.getLocal(clusterConf));
+  }
+}