You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by co...@apache.org on 2010/06/21 21:02:51 UTC

svn commit: r956666 [3/4] - in /hadoop/mapreduce/trunk: ./ ivy/ src/test/aop/build/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/testjar/ src/test/system/ src/test/system/aop/ src/test/system/aop/org/ src/test/system/aop/org/apache/ src/te...

Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheModifiedFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheModifiedFile.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheModifiedFile.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheModifiedFile.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.net.URI;
+import java.util.Collection;
+import java.util.ArrayList;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.UtilsForTests;
+
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+/**
+ * Verify the Distributed Cache functionality. This test scenario is for a
+ * distributed cache file behaviour when it is modified before and after being
+ * accessed by maximum two jobs. Once a job uses a distributed cache file that
+ * file is stored in the mapred.local.dir. If the next job uses the same file,
+ * but with differnt timestamp, then that file is stored again. So, if two jobs
+ * choose the same tasktracker for their job execution then, the distributed
+ * cache file should be found twice.
+ * 
+ * This testcase runs a job with a distributed cache file. All the tasks'
+ * corresponding tasktracker's handle is got and checked for the presence of
+ * distributed cache with proper permissions in the proper directory. Next when
+ * job runs again and if any of its tasks hits the same tasktracker, which ran
+ * one of the task of the previous job, then that file should be uploaded again
+ * and task should not use the old file. This is verified.
+ */
+
+public class TestDistributedCacheModifiedFile {
+
+  private static MRCluster cluster = null;
+  private static FileSystem dfs = null;
+  private static FileSystem ttFs = null;
+  private static JobClient client = null;
+  private static FsPermission permission = new FsPermission((short) 00777);
+
+  private static String uriPath = "hdfs:///tmp/test.txt";
+  private static final Path URIPATH = new Path(uriPath);
+  private String distributedFileName = "test.txt";
+
+  static final Log LOG =
+      LogFactory.getLog(TestDistributedCacheModifiedFile.class);
+
+  public TestDistributedCacheModifiedFile() throws Exception {
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+    client = cluster.getJTClient().getClient();
+    dfs = client.getFs();
+    // Deleting the file if it already exists
+    dfs.delete(URIPATH, true);
+
+    Collection<TTClient> tts = cluster.getTTClients();
+    // Stopping all TTs
+    for (TTClient tt : tts) {
+      tt.kill();
+    }
+    // Starting all TTs
+    for (TTClient tt : tts) {
+      tt.start();
+    }
+    // Waiting for 5 seconds to make sure tasktrackers are ready
+    Thread.sleep(5000);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    cluster.tearDown();
+    dfs.delete(URIPATH, true);
+
+    Collection<TTClient> tts = cluster.getTTClients();
+    // Stopping all TTs
+    for (TTClient tt : tts) {
+      tt.kill();
+    }
+    // Starting all TTs
+    for (TTClient tt : tts) {
+      tt.start();
+    }
+  }
+
+  @Test
+  /**
+   * This tests Distributed Cache for modified file
+   * @param none
+   * @return void
+   */
+  public void testDistributedCache() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
+
+    // This counter will check for count of a loop,
+    // which might become infinite.
+    int count = 0;
+    // This boolean will decide whether to run job again
+    boolean continueLoop = true;
+    // counter for job Loop
+    int countLoop = 0;
+    // This counter increases with all the tasktrackers in which tasks ran
+    int taskTrackerCounter = 0;
+    // This will store all the tasktrackers in which tasks ran
+    ArrayList<String> taskTrackerCollection = new ArrayList<String>();
+    // This boolean tells if two tasks ran onteh same tasktracker or not
+    boolean taskTrackerFound = false;
+
+    do {
+      SleepJob job = new SleepJob();
+      job.setConf(conf);
+      Job slpJob = job.createJob(5, 1, 1000, 1000, 100, 100);
+
+      // Before starting, Modify the file
+      String input = "This will be the content of\n" + "distributed cache\n";
+      // Creating the path with the file
+      DataOutputStream file =
+          UtilsForTests.createTmpFileDFS(dfs, URIPATH, permission, input);
+
+      DistributedCache.createSymlink(conf);
+      URI uri = URI.create(uriPath);
+      DistributedCache.addCacheFile(uri, conf);
+      JobConf jconf = new JobConf(conf);
+
+      // Controls the job till all verification is done
+      FinishTaskControlAction.configureControlActionForJob(conf);
+
+      slpJob.submit();
+      // Submitting the job
+      RunningJob rJob =
+          cluster.getJTClient().getClient().getJob(
+              org.apache.hadoop.mapred.JobID.downgrade(slpJob.getJobID()));
+
+      // counter for job Loop
+      countLoop++;
+
+      TTClient tClient = null;
+      JobInfo jInfo = wovenClient.getJobInfo(rJob.getID());
+      LOG.info("jInfo is :" + jInfo);
+
+      // Assert if jobInfo is null
+      Assert.assertNotNull("jobInfo is null", jInfo);
+
+      // Wait for the job to start running.
+      count = 0;
+      while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+        UtilsForTests.waitFor(10000);
+        count++;
+        jInfo = wovenClient.getJobInfo(rJob.getID());
+        // If the count goes beyond a point, then break; This is to avoid
+        // infinite loop under unforeseen circumstances. Testcase will anyway
+        // fail later.
+        if (count > 10) {
+          Assert.fail("job has not reached running state for more than"
+              + "100 seconds. Failing at this point");
+        }
+      }
+
+      LOG.info("job id is :" + rJob.getID().toString());
+
+      TaskInfo[] taskInfos =
+          cluster.getJTClient().getProxy().getTaskInfo(rJob.getID());
+
+      boolean distCacheFileIsFound;
+
+      for (TaskInfo taskInfo : taskInfos) {
+        distCacheFileIsFound = false;
+        String[] taskTrackers = taskInfo.getTaskTrackers();
+        for (String taskTracker : taskTrackers) {
+          // Formatting tasktracker to get just its FQDN
+          taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+          LOG.info("taskTracker is :" + taskTracker);
+
+          // The tasktrackerFound variable is initialized
+          taskTrackerFound = false;
+
+          // This will be entered from the second job onwards
+          if (countLoop > 1) {
+            if (taskTracker != null) {
+              continueLoop = taskTrackerCollection.contains(taskTracker);
+            }
+            if (continueLoop) {
+              taskTrackerFound = true;
+            }
+          }
+          // Collecting the tasktrackers
+          if (taskTracker != null)
+            taskTrackerCollection.add(taskTracker);
+
+          // we have loopped through two times to look for task
+          // getting submitted on same tasktrackers.The same tasktracker
+          // for subsequent jobs was not hit maybe because of many number
+          // of tasktrackers. So, testcase has to stop here.
+          if (countLoop > 1) {
+            continueLoop = false;
+          }
+
+          tClient = cluster.getTTClient(taskTracker);
+
+          // tClient maybe null because the task is already dead. Ex: setup
+          if (tClient == null) {
+            continue;
+          }
+
+          String[] localDirs = tClient.getMapredLocalDirs();
+          int distributedFileCount = 0;
+          // Go to every single path
+          for (String localDir : localDirs) {
+            // Public Distributed cache will always be stored under
+            // mapre.local.dir/tasktracker/archive
+            localDir =
+                localDir
+                    + Path.SEPARATOR
+                    + TaskTracker.getPublicDistributedCacheDir();
+            LOG.info("localDir is : " + localDir);
+
+            // Get file status of all the directories
+            // and files under that path.
+            FileStatus[] fileStatuses =
+                tClient.listStatus(localDir, true, true);
+            for (FileStatus fileStatus : fileStatuses) {
+              Path path = fileStatus.getPath();
+              LOG.info("path is :" + path.toString());
+              // Checking if the received path ends with
+              // the distributed filename
+              distCacheFileIsFound =
+                  (path.toString()).endsWith(distributedFileName);
+              // If file is found, check for its permission.
+              // Since the file is found break out of loop
+              if (distCacheFileIsFound) {
+                LOG.info("PATH found is :" + path.toString());
+                distributedFileCount++;
+                String filename = path.getName();
+                FsPermission fsPerm = fileStatus.getPermission();
+                Assert.assertTrue("File Permission is not 777", fsPerm
+                    .equals(new FsPermission("777")));
+              }
+            }
+          }
+
+          LOG.debug("The distributed FileCount is :" + distributedFileCount);
+          LOG.debug("The taskTrackerFound is :" + taskTrackerFound);
+
+          // If distributed cache is modified in dfs
+          // between two job runs, it can be present more than once
+          // in any of the task tracker, in which job ran.
+          if (distributedFileCount != 2 && taskTrackerFound) {
+            Assert.fail("The distributed cache file has to be two. "
+                + "But found was " + distributedFileCount);
+          } else if (distributedFileCount > 1 && !taskTrackerFound) {
+            Assert.fail("The distributed cache file cannot more than one."
+                + " But found was " + distributedFileCount);
+          } else if (distributedFileCount < 1)
+            Assert.fail("The distributed cache file is less than one. "
+                + "But found was " + distributedFileCount);
+          if (!distCacheFileIsFound) {
+            Assert.assertEquals(
+                "The distributed cache file does not exist",
+                distCacheFileIsFound, false);
+          }
+        }
+      }
+      // Allow the job to continue through MR control job.
+      for (TaskInfo taskInfoRemaining : taskInfos) {
+        FinishTaskControlAction action =
+            new FinishTaskControlAction(TaskID.downgrade(taskInfoRemaining
+                .getTaskID()));
+        Collection<TTClient> tts = cluster.getTTClients();
+        for (TTClient cli : tts) {
+          cli.getProxy().sendAction(action);
+        }
+      }
+
+      // Killing the job because all the verification needed
+      // for this testcase is completed.
+      rJob.killJob();
+
+      // Waiting for 3 seconds for cleanup to start
+      Thread.sleep(3000);
+
+      // Getting the last cleanup task's tasktracker also, as
+      // distributed cache gets uploaded even during cleanup.
+      TaskInfo[] myTaskInfos = wovenClient.getTaskInfo(rJob.getID());
+      if (myTaskInfos != null) {
+        for (TaskInfo info : myTaskInfos) {
+          if (info.isSetupOrCleanup()) {
+            String[] taskTrackers = info.getTaskTrackers();
+            for (String taskTracker : taskTrackers) {
+              // Formatting tasktracker to get just its FQDN
+              taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+              LOG.info("taskTracker is :" + taskTracker);
+              // Collecting the tasktrackers
+              if (taskTracker != null)
+                taskTrackerCollection.add(taskTracker);
+            }
+          }
+        }
+      }
+
+      // Making sure that the job is complete.
+      while (jInfo != null && !jInfo.getStatus().isJobComplete()) {
+        Thread.sleep(10000);
+        jInfo = wovenClient.getJobInfo(rJob.getID());
+      }
+
+    } while (continueLoop);
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCachePrivateFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCachePrivateFile.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCachePrivateFile.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCachePrivateFile.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.net.URI;
+import java.util.Collection;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.UtilsForTests;
+
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+/**
+ * Verify the Distributed Cache functionality.
+ * This test scenario is for a distributed cache file behaviour
+ * when the file is private. Once a job uses a distributed 
+ * cache file with private permissions that file is stored in the
+ * mapred.local.dir, under the directory which has the same name 
+ * as job submitter's username. The directory has 700 permission 
+ * and the file under it, should have 777 permissions. 
+*/
+
+public class TestDistributedCachePrivateFile {
+
+  private static MRCluster cluster = null;
+  private static FileSystem dfs = null;
+  private static JobClient client = null;
+  private static FsPermission permission = new FsPermission((short)00770);
+
+  private static String uriPath = "hdfs:///tmp/test.txt";
+  private static final Path URIPATH = new Path(uriPath);
+  private String distributedFileName = "test.txt";
+
+  static final Log LOG = LogFactory.
+                           getLog(TestDistributedCachePrivateFile.class);
+
+  public TestDistributedCachePrivateFile() throws Exception {
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+    client = cluster.getJTClient().getClient();
+    dfs = client.getFs();
+    //Deleting the file if it already exists
+    dfs.delete(URIPATH, true);
+
+    Collection<TTClient> tts = cluster.getTTClients();
+    //Stopping all TTs
+    for (TTClient tt : tts) {
+      tt.kill();
+    }
+    //Starting all TTs
+    for (TTClient tt : tts) {
+      tt.start();
+    }
+
+    String input = "This will be the content of\n" + "distributed cache\n";
+    //Creating the path with the file
+    DataOutputStream file = 
+        UtilsForTests.createTmpFileDFS(dfs, URIPATH, permission, input);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    cluster.tearDown();
+    dfs.delete(URIPATH, true);
+    
+    Collection<TTClient> tts = cluster.getTTClients();
+    //Stopping all TTs
+    for (TTClient tt : tts) {
+      tt.kill();
+    }
+    //Starting all TTs
+    for (TTClient tt : tts) {
+      tt.start();
+    }
+  }
+
+  @Test
+  /**
+   * This tests Distributed Cache for private file
+   * @param none
+   * @return void
+   */
+  public void testDistributedCache() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
+
+    //This counter will check for count of a loop,
+    //which might become infinite.
+    int count = 0;
+
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    Job slpJob = job.createJob(5, 1, 1000, 1000, 100, 100);
+
+    DistributedCache.createSymlink(conf);
+    URI uri = URI.create(uriPath);
+    DistributedCache.addCacheFile(uri, conf);
+    JobConf jconf = new JobConf(conf);
+
+    //Controls the job till all verification is done 
+    FinishTaskControlAction.configureControlActionForJob(conf);
+
+    //Submitting the job
+    slpJob.submit();
+    RunningJob rJob =
+        cluster.getJTClient().getClient().getJob(org.apache.hadoop.mapred.JobID.downgrade(slpJob.getJobID()));
+
+    JobStatus[] jobStatus = client.getAllJobs();
+    String userName = jobStatus[0].getUsername();
+
+    TTClient tClient = null;
+    JobInfo jInfo = wovenClient.getJobInfo(rJob.getID());
+    LOG.info("jInfo is :" + jInfo);
+
+    //Assert if jobInfo is null
+    Assert.assertNotNull("jobInfo is null", jInfo);
+
+    //Wait for the job to start running.
+    count = 0;
+    while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+      UtilsForTests.waitFor(10000);
+      count++;
+      jInfo = wovenClient.getJobInfo(rJob.getID());
+      //If the count goes beyond a point, then Assert; This is to avoid
+      //infinite loop under unforeseen circumstances.
+      if (count > 10) {
+        Assert.fail("job has not reached running state for more than" +
+            "100 seconds. Failing at this point");
+      }
+    }
+
+    LOG.info("job id is :" + rJob.getID().toString());
+
+    TaskInfo[] taskInfos = cluster.getJTClient().getProxy()
+           .getTaskInfo(rJob.getID());
+
+    boolean distCacheFileIsFound;
+
+    for (TaskInfo taskInfo : taskInfos) {
+      distCacheFileIsFound = false;
+      String[] taskTrackers = taskInfo.getTaskTrackers();
+
+      for(String taskTracker : taskTrackers) {
+        //Getting the exact FQDN of the tasktracker from
+        //the tasktracker string.
+        taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+        tClient =  cluster.getTTClient(taskTracker);
+        String[] localDirs = tClient.getMapredLocalDirs();
+        int distributedFileCount = 0;
+        String localDirOnly = null;
+
+        boolean FileNotPresentForThisDirectoryPath = false;
+
+        //Go to every single path
+        for (String localDir : localDirs) {
+          FileNotPresentForThisDirectoryPath = false;
+          localDirOnly = localDir;
+
+          //Public Distributed cache will always be stored under
+          //mapred.local.dir/tasktracker/archive
+          localDirOnly = localDir + Path.SEPARATOR + TaskTracker.SUBDIR + 
+              Path.SEPARATOR +  userName;
+
+          //Private Distributed cache will always be stored under
+          //mapre.local.dir/taskTracker/<username>/distcache
+          //Checking for username directory to check if it has the
+          //proper permissions
+          localDir = localDir + Path.SEPARATOR +
+                  TaskTracker.getPrivateDistributedCacheDir(userName);
+
+          FileStatus fileStatusMapredLocalDirUserName = null;
+
+          try {
+            fileStatusMapredLocalDirUserName = tClient.
+                            getFileStatus(localDirOnly, true);
+          } catch (Exception e) {
+            LOG.info("LocalDirOnly :" + localDirOnly + " not found");
+            FileNotPresentForThisDirectoryPath = true;
+          }
+
+          //File will only be stored under one of the mapred.lcoal.dir
+          //If other paths were hit, just continue  
+          if (FileNotPresentForThisDirectoryPath)
+            continue;
+
+          Path pathMapredLocalDirUserName = 
+              fileStatusMapredLocalDirUserName.getPath();
+          FsPermission fsPermMapredLocalDirUserName =
+              fileStatusMapredLocalDirUserName.getPermission();
+          Assert.assertTrue("Directory Permission is not 700",
+            fsPermMapredLocalDirUserName.equals(new FsPermission("700")));
+
+          //Get file status of all the directories 
+          //and files under that path.
+          FileStatus[] fileStatuses = tClient.listStatus(localDir, 
+              true, true);
+          for (FileStatus  fileStatus : fileStatuses) {
+            Path path = fileStatus.getPath();
+            LOG.info("path is :" + path.toString());
+            //Checking if the received path ends with 
+            //the distributed filename
+            distCacheFileIsFound = (path.toString()).
+                endsWith(distributedFileName);
+            //If file is found, check for its permission. 
+            //Since the file is found break out of loop
+            if (distCacheFileIsFound){
+              LOG.info("PATH found is :" + path.toString());
+              distributedFileCount++;
+              String filename = path.getName();
+              FsPermission fsPerm = fileStatus.getPermission();
+              Assert.assertTrue("File Permission is not 777",
+                fsPerm.equals(new FsPermission("777")));
+            }
+          }
+        }
+
+        LOG.info("Distributed File count is :" + distributedFileCount);
+
+        if (distributedFileCount > 1) {
+          Assert.fail("The distributed cache file is more than one");
+        } else if (distributedFileCount < 1)
+          Assert.fail("The distributed cache file is less than one");
+        if (!distCacheFileIsFound) {
+          Assert.assertEquals("The distributed cache file does not exist", 
+              distCacheFileIsFound, false);
+        }
+      }
+
+      //Allow the job to continue through MR control job.
+      for (TaskInfo taskInfoRemaining : taskInfos) {
+        FinishTaskControlAction action = new FinishTaskControlAction(TaskID
+           .downgrade(taskInfoRemaining.getTaskID()));
+        Collection<TTClient> tts = cluster.getTTClients();
+        for (TTClient cli : tts) {
+          cli.getProxy().sendAction(action);
+        }
+      }
+
+      //Killing the job because all the verification needed
+      //for this testcase is completed.
+      rJob.killJob();
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.net.URI;
+import java.util.Collection;
+import java.util.ArrayList;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.UtilsForTests;
+
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+/**
+ * Verify the Distributed Cache functionality. This test scenario is for a
+ * distributed cache file behaviour when it is not modified before and after
+ * being accessed by maximum two jobs. Once a job uses a distributed cache file
+ * that file is stored in the mapred.local.dir. If the next job uses the same
+ * file, then that is not stored again. So, if two jobs choose the same
+ * tasktracker for their job execution then, the distributed cache file should
+ * not be found twice.
+ * 
+ * This testcase runs a job with a distributed cache file. All the tasks'
+ * corresponding tasktracker's handle is got and checked for the presence of
+ * distributed cache with proper permissions in the proper directory. Next when
+ * job runs again and if any of its tasks hits the same tasktracker, which ran
+ * one of the task of the previous job, then that file should not be uploaded
+ * again and task use the old file. This is verified.
+ */
+
+public class TestDistributedCacheUnModifiedFile {
+
+  private static MRCluster cluster = null;
+  private static FileSystem dfs = null;
+  private static FileSystem ttFs = null;
+  private static JobClient client = null;
+  private static FsPermission permission = new FsPermission((short) 00777);
+
+  private static String uriPath = "hdfs:///tmp/test.txt";
+  private static final Path URIPATH = new Path(uriPath);
+  private String distributedFileName = "test.txt";
+
+  static final Log LOG =
+      LogFactory.getLog(TestDistributedCacheUnModifiedFile.class);
+
+  public TestDistributedCacheUnModifiedFile() throws Exception {
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+    client = cluster.getJTClient().getClient();
+    dfs = client.getFs();
+    // Deleting the file if it already exists
+    dfs.delete(URIPATH, true);
+
+    Collection<TTClient> tts = cluster.getTTClients();
+    // Stopping all TTs
+    for (TTClient tt : tts) {
+      tt.kill();
+    }
+    // Starting all TTs
+    for (TTClient tt : tts) {
+      tt.start();
+    }
+
+    // Waiting for 5 seconds to make sure tasktrackers are ready
+    Thread.sleep(5000);
+
+    String input = "This will be the content of\n" + "distributed cache\n";
+    // Creating the path with the file
+    DataOutputStream file =
+        UtilsForTests.createTmpFileDFS(dfs, URIPATH, permission, input);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    cluster.tearDown();
+    dfs.delete(URIPATH, true);
+
+    Collection<TTClient> tts = cluster.getTTClients();
+    // Stopping all TTs
+    for (TTClient tt : tts) {
+      tt.kill();
+    }
+    // Starting all TTs
+    for (TTClient tt : tts) {
+      tt.start();
+    }
+  }
+
+  @Test
+  /**
+   * This tests Distributed Cache for unmodified file
+   * @param none
+   * @return void
+   */
+  public void testDistributedCache() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
+
+    // This counter will check for count of a loop,
+    // which might become infinite.
+    int count = 0;
+    // This boolean will decide whether to run job again
+    boolean continueLoop = true;
+    // counter for job Loop
+    int countLoop = 0;
+    // This counter incerases with all the tasktrackers in which tasks ran
+    int taskTrackerCounter = 0;
+    // This will store all the tasktrackers in which tasks ran
+    ArrayList<String> taskTrackerCollection = new ArrayList<String>();
+
+    do {
+      SleepJob job = new SleepJob();
+      job.setConf(conf);
+      Job slpJob = job.createJob(5, 1, 1000, 1000, 100, 100);
+
+      DistributedCache.createSymlink(conf);
+      URI uri = URI.create(uriPath);
+      DistributedCache.addCacheFile(uri, conf);
+      JobConf jconf = new JobConf(conf);
+
+      // Controls the job till all verification is done
+      FinishTaskControlAction.configureControlActionForJob(conf);
+
+      // Submitting the job
+      slpJob.submit();
+      RunningJob rJob =
+          cluster.getJTClient().getClient().getJob(
+              org.apache.hadoop.mapred.JobID.downgrade(slpJob.getJobID()));
+
+      // counter for job Loop
+      countLoop++;
+
+      TTClient tClient = null;
+      JobInfo jInfo = wovenClient.getJobInfo(rJob.getID());
+      LOG.info("jInfo is :" + jInfo);
+
+      // Assert if jobInfo is null
+      Assert.assertNotNull("jobInfo is null", jInfo);
+
+      // Wait for the job to start running.
+      count = 0;
+      while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+        UtilsForTests.waitFor(10000);
+        count++;
+        jInfo = wovenClient.getJobInfo(rJob.getID());
+        // If the count goes beyond a point, then break; This is to avoid
+        // infinite loop under unforeseen circumstances. Testcase will anyway
+        // fail later.
+        if (count > 10) {
+          Assert.fail("job has not reached running state for more than"
+              + "100 seconds. Failing at this point");
+        }
+      }
+
+      LOG.info("job id is :" + rJob.getID().toString());
+
+      TaskInfo[] taskInfos =
+          cluster.getJTClient().getProxy().getTaskInfo(rJob.getID());
+
+      boolean distCacheFileIsFound;
+
+      for (TaskInfo taskInfo : taskInfos) {
+        distCacheFileIsFound = false;
+        String[] taskTrackers = taskInfo.getTaskTrackers();
+        for (String taskTracker : taskTrackers) {
+          // Formatting tasktracker to get just its FQDN
+          taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+          LOG.info("taskTracker is :" + taskTracker);
+
+          // This will be entered from the second job onwards
+          if (countLoop > 1) {
+            if (taskTracker != null) {
+              continueLoop = taskTrackerCollection.contains(taskTracker);
+            }
+            if (!continueLoop) {
+              break;
+            }
+          }
+
+          // Collecting the tasktrackers
+          if (taskTracker != null)
+            taskTrackerCollection.add(taskTracker);
+
+          // we have loopped through enough number of times to look for task
+          // getting submitted on same tasktrackers.The same tasktracker
+          // for subsequent jobs was not hit maybe because of many number
+          // of tasktrackers. So, testcase has to stop here.
+          if (countLoop > 2) {
+            continueLoop = false;
+          }
+
+          tClient = cluster.getTTClient(taskTracker);
+
+          // tClient maybe null because the task is already dead. Ex: setup
+          if (tClient == null) {
+            continue;
+          }
+
+          String[] localDirs = tClient.getMapredLocalDirs();
+          int distributedFileCount = 0;
+          // Go to every single path
+          for (String localDir : localDirs) {
+            // Public Distributed cache will always be stored under
+            // mapre.local.dir/tasktracker/archive
+            localDir =
+                localDir
+                    + Path.SEPARATOR
+                    + TaskTracker.getPublicDistributedCacheDir();
+            LOG.info("localDir is : " + localDir);
+
+            // Get file status of all the directories
+            // and files under that path.
+            FileStatus[] fileStatuses =
+                tClient.listStatus(localDir, true, true);
+            for (FileStatus fileStatus : fileStatuses) {
+              Path path = fileStatus.getPath();
+              LOG.info("path is :" + path.toString());
+              // Checking if the received path ends with
+              // the distributed filename
+              distCacheFileIsFound =
+                  (path.toString()).endsWith(distributedFileName);
+              // If file is found, check for its permission.
+              // Since the file is found break out of loop
+              if (distCacheFileIsFound) {
+                LOG.info("PATH found is :" + path.toString());
+                distributedFileCount++;
+                String filename = path.getName();
+                FsPermission fsPerm = fileStatus.getPermission();
+                Assert.assertTrue("File Permission is not 777", fsPerm
+                    .equals(new FsPermission("777")));
+              }
+            }
+          }
+
+          // Since distributed cache is unmodified in dfs
+          // between two job runs, it should not be present more than once
+          // in any of the task tracker, in which job ran.
+          if (distributedFileCount > 1) {
+            Assert.fail("The distributed cache file is more than one");
+          } else if (distributedFileCount < 1)
+            Assert.fail("The distributed cache file is less than one");
+          if (!distCacheFileIsFound) {
+            Assert.assertEquals(
+                "The distributed cache file does not exist",
+                distCacheFileIsFound, false);
+          }
+        }
+      }
+      // Allow the job to continue through MR control job.
+      for (TaskInfo taskInfoRemaining : taskInfos) {
+        FinishTaskControlAction action =
+            new FinishTaskControlAction(TaskID.downgrade(taskInfoRemaining
+                .getTaskID()));
+        Collection<TTClient> tts = cluster.getTTClients();
+        for (TTClient cli : tts) {
+          cli.getProxy().sendAction(action);
+        }
+      }
+
+      // Killing the job because all the verification needed
+      // for this testcase is completed.
+      rJob.killJob();
+    } while (continueLoop);
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestFileOwner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestFileOwner.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestFileOwner.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestFileOwner.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TTInfo;
+import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileOwner {
+  public static MRCluster cluster;
+
+  private StringBuffer jobIdDir = new StringBuffer();
+  private JTProtocol wovenClient = null;
+  private static final Log LOG = LogFactory.getLog(TestFileOwner.class);
+  private String taskController = null;
+  private final FsPermission PERM_777 = new FsPermission("777");
+  private final FsPermission PERM_755 = new FsPermission("755");
+  private final FsPermission PERM_644 = new FsPermission("644");
+
+  @BeforeClass
+  public static void setUp() throws java.lang.Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+  }
+
+  /*
+   * The test is used to check the file permission of local files in
+   * mapred.local.dir. The job control is used which will make the tasks wait
+   * for completion until it is signaled
+   * 
+   * @throws Exception in case of test errors
+   */
+  @Test
+  public void testFilePermission() throws Exception {
+    wovenClient = cluster.getJTClient().getProxy();
+    Configuration conf = new Configuration(cluster.getConf());
+    FinishTaskControlAction.configureControlActionForJob(conf);
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    Job slpJob = job.createJob(1, 0, 100, 100, 100, 100);
+    JobConf jconf = new JobConf(conf);
+    slpJob.submit();
+    RunningJob rJob =
+        cluster.getJTClient().getClient().getJob(
+            org.apache.hadoop.mapred.JobID.downgrade(slpJob.getJobID()));
+    taskController = conf.get(TTConfig.TT_TASK_CONTROLLER);
+    // get the job info so we can get the env variables from the daemon.
+    // Now wait for the task to be in the running state, only then the
+    // directories will be created
+    JobInfo info = wovenClient.getJobInfo(rJob.getID());
+    Assert.assertNotNull("JobInfo is null", info);
+    JobID id = rJob.getID();
+    while (info.runningMaps() != 1) {
+      Thread.sleep(1000);
+      info = wovenClient.getJobInfo(id);
+    }
+    TaskInfo[] myTaskInfos = wovenClient.getTaskInfo(id);
+    for (TaskInfo tInfo : myTaskInfos) {
+      if (!tInfo.isSetupOrCleanup()) {
+        String[] taskTrackers = tInfo.getTaskTrackers();
+        for (String taskTracker : taskTrackers) {
+          TTInfo ttInfo = wovenClient.getTTInfo(taskTracker);
+          TTClient ttCli = cluster.getTTClient(ttInfo.getStatus().getHost());
+          Assert.assertNotNull("TTClient instance is null", ttCli);
+          TTTaskInfo ttTaskInfo = ttCli.getProxy().getTask(tInfo.getTaskID());
+          Assert.assertNotNull("TTTaskInfo is null", ttTaskInfo);
+          while (ttTaskInfo.getTaskStatus().getRunState() != TaskStatus.State.RUNNING) {
+            Thread.sleep(100);
+            ttTaskInfo = ttCli.getProxy().getTask(tInfo.getTaskID());
+          }
+          testPermissionWithTaskController(ttCli, conf, info);
+          FinishTaskControlAction action =
+              new FinishTaskControlAction(TaskID.downgrade(tInfo.getTaskID()));
+          for (TTClient cli : cluster.getTTClients()) {
+            cli.getProxy().sendAction(action);
+          }
+        }
+      }
+    }
+    JobInfo jInfo = wovenClient.getJobInfo(id);
+    jInfo = cluster.getJTClient().getProxy().getJobInfo(id);
+    while (!jInfo.getStatus().isJobComplete()) {
+      Thread.sleep(100);
+      jInfo = cluster.getJTClient().getProxy().getJobInfo(id);
+    }
+  }
+
+  private void testPermissionWithTaskController(
+      TTClient tClient, Configuration conf, JobInfo info) {
+    Assert.assertNotNull("TTclient is null", tClient);
+    FsPermission fsPerm = null;
+    String[] pathInfo = conf.getStrings(MRConfig.LOCAL_DIR);
+    for (int i = 0; i < pathInfo.length; i++) {
+      // First verify the jobid directory exists
+      jobIdDir = new StringBuffer();
+      String userName = null;
+      try {
+        JobStatus[] jobStatus = cluster.getJTClient().getClient().getAllJobs();
+        userName = jobStatus[0].getUsername();
+      } catch (Exception ex) {
+        LOG.error("Failed to get user name");
+        boolean status = false;
+        Assert.assertTrue("Failed to get the userName", status);
+      }
+      jobIdDir.append(pathInfo[i]).append(Path.SEPARATOR);
+      jobIdDir.append(TaskTracker.getLocalJobDir(userName, info
+          .getID().toString()));
+      FileStatus[] fs = null;
+      try {
+        fs = tClient.listStatus(jobIdDir.toString(), true);
+      } catch (Exception ex) {
+        LOG.error("Failed to get the jobIdDir files " + ex);
+      }
+      Assert.assertEquals("Filestatus length is zero", fs.length != 0, true);
+      for (FileStatus file : fs) {
+        try {
+          String filename = file.getPath().getName();
+          if (filename.equals(TaskTracker.JOBFILE)) {
+            if (taskController == DefaultTaskController.class.getName()) {
+              fsPerm = file.getPermission();
+              Assert.assertTrue("FilePermission failed for " + filename, fsPerm
+                  .equals(PERM_777));
+            }
+          }
+          if (filename.startsWith("attempt")) {
+            StringBuffer attemptDir = new StringBuffer(jobIdDir);
+            attemptDir.append(Path.SEPARATOR).append(filename);
+            if (tClient.getFileStatus(attemptDir.toString(), true) != null) {
+              FileStatus[] attemptFs =
+                  tClient.listStatus(attemptDir.toString(), true, true);
+              for (FileStatus attemptfz : attemptFs) {
+                Assert.assertNotNull("FileStatus is null", attemptfz);
+                fsPerm = attemptfz.getPermission();
+                Assert.assertNotNull("FsPermission is null", fsPerm);
+                if (taskController == DefaultTaskController.class.getName()) {
+                  if (!attemptfz.isDir()) {
+                    Assert.assertTrue(
+                        "FilePermission failed for " + filename, fsPerm
+                            .equals(PERM_777));
+                  } else {
+                    Assert.assertTrue(
+                        "FilePermission failed for " + filename, fsPerm
+                            .equals(PERM_755));
+                  }
+                }
+              }
+            }
+          }
+          if (filename.equals(TaskTracker.TASKJARDIR)) {
+            StringBuffer jarsDir = new StringBuffer(jobIdDir);
+            jarsDir.append(Path.SEPARATOR).append(filename);
+            FileStatus[] jarsFs =
+                tClient.listStatus(jarsDir.toString(), true, true);
+            for (FileStatus jarsfz : jarsFs) {
+              Assert.assertNotNull("FileStatus is null", jarsfz);
+              fsPerm = jarsfz.getPermission();
+              Assert.assertNotNull("File permission is null", fsPerm);
+              if (taskController == DefaultTaskController.class.getName()) {
+                if (!jarsfz.isDir()) {
+                  if (jarsfz.getPath().getName().equals("job.jar")) {
+                    Assert.assertTrue(
+                        "FilePermission failed for " + filename, fsPerm
+                            .equals(PERM_777));
+                  } else {
+                    Assert.assertTrue(
+                        "FilePermission failed for " + filename, fsPerm
+                            .equals(PERM_644));
+                  }
+                } else {
+                  Assert.assertTrue(
+                      "FilePermission failed for " + filename, fsPerm
+                          .equals(PERM_755));
+                }
+              }
+            }
+          }
+        } catch (Exception ex) {
+          LOG.error("The exception occurred while searching for nonexsistent"
+              + "file, ignoring and continuing. " + ex);
+        }
+      }// for loop ends
+    }// for loop ends
+  }
+
+  @AfterClass
+  public static void tearDown() throws java.lang.Exception {
+    cluster.tearDown();
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestJobKill.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestJobKill.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestJobKill.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestJobKill.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import testjar.JobKillCommitter;
+
+public class TestJobKill {
+  private static final Log LOG = LogFactory.getLog(TestJobKill.class);
+  private JTProtocol wovenClient = null;
+  private static Path outDir = new Path("output");
+  private static Path inDir = new Path("input");
+  private static FileSystem fs = null;
+  private static MRCluster cluster;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+    fs = inDir.getFileSystem(cluster.getJTClient().getConf());
+    if(!fs.exists(inDir)){
+      fs.create(inDir);
+    }
+    if (fs.exists(outDir)) {
+      fs.delete(outDir,true);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if(fs.exists(inDir)) {
+      fs.delete(inDir,true);
+    }    
+    if (fs.exists(outDir)) {
+      fs.delete(outDir,true);
+    }
+    cluster.tearDown();
+  }
+
+  /*
+   * The test case intention is to test the job failure due to system
+   * exceptions, so the exceptions are thrown intentionally and the job is
+   * verified for failure. At the end of the test, the verification is made
+   * that the success file is not present in the hdfs location. This is because
+   * the success file only should exist if the actual job had succeeded. 
+   * 
+   * @throws Exception in a case of test errors
+   */
+  @Test
+  public void testSystemJobKill() throws Exception {
+    wovenClient = cluster.getJTClient().getProxy();
+    Configuration conf = new Configuration(cluster.getConf());
+    conf.set(MRJobConfig.MAP_MAX_ATTEMPTS, "1");
+    conf.set(MRJobConfig.REDUCE_MAX_ATTEMPTS, "1");
+    // fail the mapper job
+    failJob(conf, JobKillCommitter.CommitterWithNoError.class, "JobMapperFail",
+        JobKillCommitter.MapperFail.class, JobKillCommitter.ReducerPass.class,
+        false);
+    // fail the reducer job
+    failJob(conf, JobKillCommitter.CommitterWithNoError.class,
+        "JobReducerFail", JobKillCommitter.MapperPass.class,
+        JobKillCommitter.ReducerFail.class,false);
+    // fail the set up job
+    failJob(conf, JobKillCommitter.CommitterWithFailSetup.class,
+        "JobSetupFail", JobKillCommitter.MapperPass.class,
+        JobKillCommitter.ReducerPass.class,false);
+    // fail the clean up job
+    failJob(conf, JobKillCommitter.CommitterWithFailCleanup.class,
+        "JobCleanupFail", JobKillCommitter.MapperPass.class,
+        JobKillCommitter.ReducerPass.class,false);
+  }
+
+  private void failJob(Configuration conf,
+      Class<? extends OutputCommitter> theClass, String confName,
+      Class<? extends Mapper> mapClass, Class<? extends Reducer> redClass,
+      boolean isUserKill)
+      throws Exception {
+    Job job = new Job(conf, confName);
+    job.setJarByClass(JobKillCommitter.class);
+    job.setMapperClass(mapClass);
+    job.setCombinerClass(redClass);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setReducerClass(redClass);
+    job.setNumReduceTasks(1);
+    FileInputFormat.addInputPath(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+    JobConf jconf = new JobConf(job.getConfiguration(), JobKillCommitter.class);
+    jconf.setOutputCommitter(theClass);
+    if(!isUserKill)
+    {  
+      RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+      JobID id = rJob.getID();
+      JobInfo jInfo = wovenClient.getJobInfo(id);
+      Assert.assertTrue("Job is not in PREP state",
+          jInfo.getStatus().getRunState() == JobStatus.PREP);
+    }
+    else
+    {
+      //user kill job
+      RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+      JobInfo info = wovenClient.getJobInfo(rJob.getID());
+      Assert.assertNotNull("Job Info is null",info);
+      JobID id = rJob.getID();
+      while (info.runningMaps() != 1) {
+        Thread.sleep(1000);
+        info = wovenClient.getJobInfo(id);
+      }
+      rJob.killJob();
+    }
+    checkCleanup(jconf);
+    deleteOutputDir();
+  }
+  
+  /**
+   * This test is used to kill the job by explicity calling the kill api
+   * and making sure the clean up happens
+   * @throws Exception
+   */
+  @Test
+  public void testUserJobKill() throws Exception{
+    wovenClient = cluster.getJTClient().getProxy();
+    Configuration conf = new Configuration(cluster.getConf());
+    conf.set(MRJobConfig.MAP_MAX_ATTEMPTS, "1");
+    conf.set(MRJobConfig.REDUCE_MAX_ATTEMPTS, "1");
+    // fail the mapper job
+    failJob(conf, JobKillCommitter.CommitterWithNoError.class, "JobUserKill",
+        JobKillCommitter.MapperPassSleep.class, 
+        JobKillCommitter.ReducerPass.class,true);    
+  }
+
+  private void checkCleanup(JobConf conf) throws Exception {
+    if (outDir != null) {
+      if (fs.exists(outDir)) {
+        Path filePath = new Path(outDir,
+            FileOutputCommitter.SUCCEEDED_FILE_NAME);
+        // check to make sure the success file is not there since the job
+        // failed.
+        Assert.assertTrue("The success file is present when the job failed",
+            !fs.exists(filePath));
+      }
+    }
+  }
+
+  private void deleteOutputDir() throws Exception {
+    if (fs != null) {
+      fs.delete(outDir, true);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestPushConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestPushConfig.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestPushConfig.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestPushConfig.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+import java.io.File;
+import java.io.FileOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPushConfig {
+  private static MRCluster cluster;
+  private String localConfDir = "localconf";
+  private static final Log LOG = LogFactory.getLog(
+      TestPushConfig.class.getName());
+  
+  @BeforeClass
+  public static void before() throws Exception {
+    String [] expExcludeList = new String[2];
+    expExcludeList[0] = "java.net.ConnectException";
+    expExcludeList[1] = "java.io.IOException";
+    
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setExcludeExpList(expExcludeList);
+    cluster.setUp();
+  }
+
+  @AfterClass
+  public static void after() throws Exception {
+    cluster.tearDown();
+  }
+  
+  /**
+   * This test about testing the pushConfig feature. The pushConfig functionality
+   * available as part of the cluster process manager. The functionality takes
+   * in local input directory and pushes all the files from the local to the 
+   * remote conf directory. This functionality is required is change the config
+   * on the fly and restart the cluster which will be used by other test cases
+   * @throws Exception 
+   */
+  @Test
+  public void testPushConfig() throws Exception {
+    final String DUMMY_CONFIG_STRING = "mapreduce.newdummy.conf";
+    final String DUMMY_CONFIG_STRING_VALUE = "HerriotTestRules";
+    Configuration origconf = new Configuration(cluster.getConf());
+    origconf.set(DUMMY_CONFIG_STRING, DUMMY_CONFIG_STRING_VALUE);
+    String localDir = HadoopDaemonRemoteCluster.getDeployedHadoopConfDir() + 
+        File.separator + localConfDir;
+    File lFile = new File(localDir);
+    if(!lFile.exists()){
+      lFile.mkdir();
+    }
+    String mapredConf = localDir + File.separator + "mapred-site.xml";
+    File file = new File(mapredConf);
+    origconf.writeXml(new FileOutputStream(file));    
+    Configuration daemonConf =  cluster.getJTClient().getProxy().getDaemonConf();
+    Assert.assertTrue("Dummy varialble is expected to be null before restart.",
+        daemonConf.get(DUMMY_CONFIG_STRING) == null);
+    String newDir = cluster.getClusterManager().pushConfig(localDir);
+    cluster.stop();
+    AbstractDaemonClient cli = cluster.getJTClient();
+    waitForClusterStop(cli);
+    // make sure the cluster has actually stopped
+    cluster.getClusterManager().start(newDir);
+    cli = cluster.getJTClient();
+    waitForClusterStart(cli);
+    // make sure the cluster has actually started
+    Configuration newconf = cluster.getJTClient().getProxy().getDaemonConf();
+    Assert.assertTrue("Extra varialble is expected to be set",
+        newconf.get(DUMMY_CONFIG_STRING).equals(DUMMY_CONFIG_STRING_VALUE));
+    cluster.getClusterManager().stop(newDir);
+    cli = cluster.getJTClient();
+    // make sure the cluster has actually stopped
+    waitForClusterStop(cli);
+    // start the daemons with original conf dir
+    cluster.getClusterManager().start();
+    cli = cluster.getJTClient();    
+    waitForClusterStart(cli);  
+    daemonConf =  cluster.getJTClient().getProxy().getDaemonConf();
+    Assert.assertTrue("Dummy variable is expected to be null after restart.",
+        daemonConf.get(DUMMY_CONFIG_STRING) == null);
+    lFile.delete();
+  }
+  
+  private void waitForClusterStop(AbstractDaemonClient cli) throws Exception {
+    int i=1;
+    while (i < 40) {
+      try {
+        cli.ping();
+        Thread.sleep(1000);
+        i++;
+      } catch (Exception e) {
+        break;
+      }
+    }
+    for (AbstractDaemonClient tcli : cluster.getTTClients()) {
+      i = 1;
+      while (i < 40) {
+        try {
+          tcli.ping();
+          Thread.sleep(1000);
+          i++;
+        } catch (Exception e) {
+          break;
+        }
+      }
+      if (i >= 40) {
+        Assert.fail("TT on " + tcli.getHostName() + " Should have been down.");
+      }
+    }
+  }
+  
+  private void waitForClusterStart(AbstractDaemonClient cli) throws Exception {
+    int i=1;
+    while (i < 40) {
+      try {
+        cli.ping();
+        break;
+      } catch (Exception e) {
+        i++;
+        Thread.sleep(1000);
+        LOG.info("Waiting for Jobtracker on host : "
+            + cli.getHostName() + " to come up.");
+      }
+    }
+    for (AbstractDaemonClient tcli : cluster.getTTClients()) {
+      i = 1;
+      while (i < 40) {
+        try {
+          tcli.ping();
+          break;
+        } catch (Exception e) {
+          i++;
+          Thread.sleep(1000);
+          LOG.info("Waiting for Tasktracker on host : "
+              + tcli.getHostName() + " to come up.");
+        }
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestSortValidate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestSortValidate.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestSortValidate.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestSortValidate.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.examples.RandomWriter;
+import org.apache.hadoop.examples.Sort;
+
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A System test to test the Map-Reduce framework's sort 
+ * with a real Map-Reduce Cluster.
+ */
+public class TestSortValidate {
+  // Input/Output paths for sort
+  private static final Path SORT_INPUT_PATH = new Path("inputDirectory");
+  private static final Path SORT_OUTPUT_PATH = new Path("outputDirectory");
+
+  // make it big enough to cause a spill in the map
+  private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024;
+  private static final int RW_MAPS_PER_HOST = 2;
+
+  private MRCluster cluster = null;
+  private FileSystem dfs = null;
+  private JobClient client = null;
+
+  private static final Log LOG = LogFactory.getLog(TestSortValidate.class);
+
+  public TestSortValidate()
+  throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+  }
+
+  @Before
+  public void setUp() throws java.lang.Exception {
+    cluster.setUp();
+    client = cluster.getJTClient().getClient();
+
+    dfs = client.getFs();
+    dfs.delete(SORT_INPUT_PATH, true);
+    dfs.delete(SORT_OUTPUT_PATH, true);
+  }
+
+  @After
+  public void after() throws Exception {
+    cluster.tearDown();
+    dfs.delete(SORT_INPUT_PATH, true);
+    dfs.delete(SORT_OUTPUT_PATH, true);
+  }
+
+  public void runRandomWriter(Configuration job, Path sortInput) 
+  throws Exception {
+    // Scale down the default settings for RandomWriter for the test-case
+    // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP
+    job.setInt("test.randomwrite.bytes_per_map", RW_BYTES_PER_MAP);
+    job.setInt("test.randomwriter.maps_per_host", RW_MAPS_PER_HOST);
+    String[] rwArgs = {sortInput.toString()};
+ 
+    runAndVerify(job,new RandomWriter(), rwArgs);
+  }
+
+  private void runAndVerify(Configuration job, Tool tool, String[] args)
+    throws Exception {
+
+    // This calculates the previous number fo jobs submitted before a new
+    // job gets submitted.
+    int prevJobsNum = 0;
+
+    // JTProtocol wovenClient
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
+
+    // JobStatus
+    JobStatus[] jobStatus = null;
+
+    // JobID
+    JobID id = null;
+
+    // RunningJob rJob;
+    RunningJob rJob = null;
+
+    // JobInfo jInfo;
+    JobInfo jInfo = null;
+
+    //Getting the previous job numbers that are submitted.
+    jobStatus = client.getAllJobs();
+    prevJobsNum = jobStatus.length;
+
+    // Run RandomWriter
+    Assert.assertEquals(ToolRunner.run(job, tool, args), 0);
+
+    //Waiting for the job to appear in the jobstatus
+    jobStatus = client.getAllJobs();
+
+    while (jobStatus.length - prevJobsNum == 0) {
+      LOG.info("Waiting for the job to appear in the jobStatus");
+      Thread.sleep(1000);
+      jobStatus = client.getAllJobs();
+    }
+
+    //Getting the jobId of the just submitted job
+    //The just submitted job is always added in the first slot of jobstatus
+    id = jobStatus[0].getJobID();
+
+    rJob = client.getJob(id);
+
+    jInfo = wovenClient.getJobInfo(id);
+
+    //Making sure that the job is complete.
+    while (jInfo != null && !jInfo.getStatus().isJobComplete()) {
+      Thread.sleep(10000);
+      jInfo = wovenClient.getJobInfo(id);
+    }
+
+    cluster.getJTClient().verifyCompletedJob(id);
+  }
+  
+  private void runSort(Configuration job, Path sortInput, Path sortOutput) 
+  throws Exception {
+
+    job.setInt("io.sort.mb", 1);
+
+    // Setup command-line arguments to 'sort'
+    String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
+    
+    runAndVerify(job,new Sort(), sortArgs);
+
+  }
+  
+  private void runSortValidator(Configuration job, 
+                                       Path sortInput, Path sortOutput) 
+  throws Exception {
+    String[] svArgs = {"-sortInput", sortInput.toString(), 
+                       "-sortOutput", sortOutput.toString()};
+
+    runAndVerify(job,new SortValidator(), svArgs);
+
+  }
+ 
+  @Test 
+  public void testMapReduceSort() throws Exception {
+    // Run randomwriter to generate input for 'sort'
+    runRandomWriter(cluster.getConf(), SORT_INPUT_PATH);
+
+    // Run sort
+    runSort(cluster.getConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
+
+    // Run sort-validator to check if sort worked correctly
+    runSortValidator(cluster.getConf(), SORT_INPUT_PATH, 
+                     SORT_OUTPUT_PATH);
+  }
+}