You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:28:52 UTC

svn commit: r1077557 - /hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobHistoryLocation.java

Author: omalley
Date: Fri Mar  4 04:28:52 2011
New Revision: 1077557

URL: http://svn.apache.org/viewvc?rev=1077557&view=rev
Log:
commit 0984dd5c56ec93fd5c9247d178722174f61dea28
Author: Iyappan Srinivasan <iy...@yahoo-inc.com>
Date:   Tue Jul 20 04:34:56 2010 +0000

    MAPREDUCE-1741 from https://issues.apache.org/jira/secure/attachment/12448855/TestJobHistoryLocation-ydist-security-patch.txt

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobHistoryLocation.java

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobHistoryLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobHistoryLocation.java?rev=1077557&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobHistoryLocation.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobHistoryLocation.java Fri Mar  4 04:28:52 2011
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.UtilsForTests;
+
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.examples.SleepJob;
+import java.io.DataOutputStream;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+/**
+ * Verify the retired job history Location.
+ */
+
+public class TestJobHistoryLocation {
+
+  private static MRCluster cluster = null;
+  private static FileSystem dfs = null;
+  private static JobClient jobClient = null;
+  private static String jobHistoryDonePathString = null;
+  private static int count = 0;
+  private static int fileCount = 0;
+  private static boolean jobFileFound = false;
+  private static int retiredJobInterval = 0;
+  static final Log LOG = LogFactory.
+      getLog(TestJobHistoryLocation.class);
+
+  public TestJobHistoryLocation() throws Exception {
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+    jobClient = cluster.getJTClient().getClient();
+    dfs = jobClient.getFs();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    cluster.tearDown();
+  }
+
+  /**
+   * This tests when successful / failed jobs are retired, the location
+   * of the retired jobs are as according to 
+   * mapred.job.tracker.history.completed.location.
+   * This tests when there are 100 files in the done directory,
+   * still the retired jobs are as according to
+   * mapred.job.tracker.history.completed.location.
+   * @param none
+   * @return void
+   */
+  @Test
+  public void testRetiredJobsHistoryLocation() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+    JTProtocol remoteJTClient = cluster.getJTClient().getProxy();
+    int testIterationLoop = 0;
+
+    do {
+      SleepJob job = null;
+      testIterationLoop++;
+      job = new SleepJob();
+      job.setConf(conf);
+      conf = job.setupJobConf(5, 1, 100, 100, 100, 100);
+      //Get the value of mapred.jobtracker.retirejob.check. If not
+      //found then use 60000 milliseconds, which is the application default.
+      retiredJobInterval = 
+        conf.getInt("mapred.jobtracker.retirejob.check", 60000);
+      //Assert if retiredJobInterval is 0
+      if ( retiredJobInterval == 0 ) {
+        Assert.fail("mapred.jobtracker.retirejob.check is 0");
+      }
+
+      conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
+      jobFileFound = false;
+
+      JobConf jconf = new JobConf(conf);
+      jobHistoryDonePathString = null;
+      jobHistoryDonePathString = jconf.
+          get("mapred.job.tracker.history.completed.location");
+      //Assert if jobHistoryDonePathString is null
+      Assert.assertNotNull("mapred.job.tracker.history.completed.location " +
+          "is null", jobHistoryDonePathString); 
+
+      LOG.info("jobHistoryDonePath location is :" + jobHistoryDonePathString);
+
+      FileStatus[] jobHistoryDoneFileStatuses = dfs.
+          listStatus(new Path (jobHistoryDonePathString));
+      String jobHistoryPathString = jconf.get("hadoop.job.history.location");
+
+      //Controls the job till all verification is done 
+      FinishTaskControlAction.configureControlActionForJob(conf);
+      //Submitting the job
+      RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+
+      JobID jobID = rJob.getID();
+      JobInfo jInfo = remoteJTClient.getJobInfo(jobID);
+      String jobIDString = jobID.toString();
+      LOG.info("jobIDString is :" + jobIDString);
+
+      //Assert if jobInfo is null
+      Assert.assertNotNull("jobInfo is null", jInfo);
+
+      waitTillRunState(jInfo, jobID, remoteJTClient);
+
+      if (jobHistoryPathString != null) {
+        FileStatus[] jobHistoryFileStatuses = dfs.
+          listStatus(new Path (jobHistoryPathString));
+        jobFileFound = false;
+        for (FileStatus jobHistoryFileStatus : jobHistoryFileStatuses) {
+          if ((jobHistoryFileStatus.getPath().toString()).
+              matches(jobIDString)) {
+            jobFileFound = true;
+            break;
+          }
+        }
+        Assert.assertTrue("jobFileFound is false", jobFileFound);
+      }
+
+      TaskInfo[] taskInfos = cluster.getJTClient().getProxy()
+          .getTaskInfo(rJob.getID());
+
+      //Allow the job to continue through MR control job.
+      for (TaskInfo taskInfoRemaining : taskInfos) {
+        FinishTaskControlAction action = new FinishTaskControlAction(TaskID
+          .downgrade(taskInfoRemaining.getTaskID()));
+        Collection<TTClient> tts = cluster.getTTClients();
+        for (TTClient cli : tts) {
+          cli.getProxy().sendAction(action);
+        }
+      }
+
+      //Killing this job will happen only in the second iteration.
+      if (testIterationLoop == 2) {
+        //Killing the job because all the verification needed
+        //for this testcase is completed.
+        rJob.killJob();
+      }
+
+      //Making sure that the job is complete.
+      count = 0;
+      while (jInfo != null && !jInfo.getStatus().isJobComplete()) {
+        UtilsForTests.waitFor(10000);
+        count++;
+        jInfo = remoteJTClient.getJobInfo(rJob.getID());
+        //If the count goes beyond 100 seconds, then break; This is to avoid
+        //infinite loop.
+        if (count > 10) {
+          Assert.fail("job has not reached running state for more than" +
+              "100 seconds. Failing at this point");
+        }
+      }
+
+      //After checking for Job Completion, waiting for 4 times of 
+      //retiredJobInterval seconds for the job to go to retired state 
+      UtilsForTests.waitFor(retiredJobInterval * 4);
+
+
+      jobHistoryDoneFileStatuses = dfs.
+          listStatus(new Path (jobHistoryDonePathString));
+
+      checkJobHistoryFileInformation( jobHistoryDoneFileStatuses, jobIDString);  
+    } while ( testIterationLoop < 2 );
+  }
+
+  /**
+   * This tests when multiple instances of successful / failed jobs are 
+   * retired, the location of the retired jobs are as according to 
+   * mapred.job.tracker.history.completed.location 
+   * @param none
+   * @return void
+   */
+  @Test
+  public void testRetiredMultipleJobsHistoryLocation() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+    JTProtocol remoteJTClient = cluster.getJTClient().getProxy();
+    int testIterationLoop = 0;
+    FileStatus[] jobHistoryDoneFileStatuses;
+    RunningJob[] rJobCollection = new RunningJob[4];
+    JobID[] rJobIDCollection = new JobID[4];
+    String jobHistoryDonePathString = null;
+    JobInfo jInfo = null;
+    for ( int noOfJobs = 0; noOfJobs < 4; noOfJobs++ ) {
+      SleepJob job = null;
+      testIterationLoop++;
+      job = new SleepJob();
+      job.setConf(conf);
+      conf = job.setupJobConf(5, 1, 100, 100, 100, 100);
+      conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", 
+        false);
+      JobConf jconf = new JobConf(conf);
+
+      jobHistoryDonePathString = null;
+      jobHistoryDonePathString = jconf.
+          get("mapred.job.tracker.history.completed.location");
+      //Assert if jobHistoryDonePathString is null
+      Assert.assertNotNull("mapred.job.tracker.history.completed.location "
+          + "is null", jobHistoryDonePathString);
+
+      LOG.info("jobHistoryDonePath location is :" + 
+          jobHistoryDonePathString);
+
+      //Submitting the job
+      RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+      JobID jobID = rJob.getID();
+     
+      rJobCollection[noOfJobs] = rJob;
+      rJobIDCollection[noOfJobs] = jobID;
+
+      jInfo = remoteJTClient.getJobInfo(jobID);
+      LOG.info("jobIDString is :" + jobID.toString());
+      //Assert if jobInfo is null
+      Assert.assertNotNull("jobInfo is null", jInfo);
+    }
+
+    //Wait for the jobs to start running.
+    for (int noOfJobs = 0; noOfJobs < 4; noOfJobs++) {
+      waitTillRunState(jInfo, rJobIDCollection[noOfJobs], remoteJTClient);
+    }
+
+    //Killing two jobs 
+    (rJobCollection[0]).killJob();
+    (rJobCollection[3]).killJob();
+
+    //Making sure that the jobs are complete.
+    for (int noOfJobs = 0; noOfJobs < 4; noOfJobs++) {
+      count = 0;
+      while (remoteJTClient.getJobInfo(rJobIDCollection[noOfJobs]) != null && 
+          !(remoteJTClient.getJobInfo(rJobIDCollection[noOfJobs])).
+          getStatus().isJobComplete()) {
+        UtilsForTests.waitFor(10000);
+        count++;
+        //If the count goes beyond 100 seconds, then break; This is to avoid
+        //infinite loop.
+        if (count > 20) {
+          Assert.fail("job has not reached completed state for more than" +
+              "200 seconds. Failing at this point");
+        }
+      }
+    }
+
+    //After checking for Job Completion, waiting for 4 times 
+    // of retiredJobInterval seconds for the job to go to retired state 
+    UtilsForTests.waitFor(retiredJobInterval * 4);
+
+    jobHistoryDoneFileStatuses = dfs.
+        listStatus(new Path (jobHistoryDonePathString));
+
+    for (int noOfJobs = 0; noOfJobs < 4; noOfJobs++) {
+      checkJobHistoryFileInformation( jobHistoryDoneFileStatuses, 
+          (rJobIDCollection[noOfJobs]).toString());
+    Assert.assertTrue("jobFileFound is false. Job History " +
+        "File is not found in the done directory",
+          jobFileFound);
+    Assert.assertEquals("Both the job related files are not found",
+        fileCount, 2);
+
+    }
+  }
+
+  //Waiting till job starts running
+  private void waitTillRunState(JobInfo jInfo, JobID jobID, 
+      JTProtocol remoteJTClient) throws Exception { 
+    int count = 0;
+    while (jInfo != null && jInfo.getStatus().getRunState()
+        != JobStatus.RUNNING) {
+      UtilsForTests.waitFor(10000);
+      count++;
+      jInfo = remoteJTClient.getJobInfo(jobID);
+      //If the count goes beyond 100 seconds, then break; This is to avoid
+      //infinite loop.
+      if (count > 10) {
+        Assert.fail("job has not reached running state for more than" +
+            "100 seconds. Failing at this point");
+      }
+    }
+  }
+ 
+  //Checking for job file information in done directory
+  private void checkJobHistoryFileInformation( FileStatus[] 
+      jobHistoryDoneFileStatuses, String jobIDString ) throws Exception {
+    fileCount = 0;
+    jobFileFound = false;
+    for (FileStatus jobHistoryDoneFileStatus : jobHistoryDoneFileStatuses) {
+      LOG.info("jobHistoryDoneFileStatus path is :" +
+          jobHistoryDoneFileStatus.getPath().toString());
+      LOG.info("jobIDString path is :" + jobIDString);
+      StringBuffer jobHistoryDoneFileBuffer = new
+          StringBuffer(jobHistoryDoneFileStatus.getPath().toString());
+
+      if ( jobHistoryDoneFileBuffer.indexOf(jobIDString) != -1 ) {
+        jobFileFound = true;
+        fileCount++;
+        //Both the conf file and the job file has to be present
+        if (fileCount == 2) {
+          break;
+        }
+      }
+    }
+  }
+}