You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:23:56 UTC

svn commit: r1077524 - /hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestLostTaskTracker.java

Author: omalley
Date: Fri Mar  4 04:23:56 2011
New Revision: 1077524

URL: http://svn.apache.org/viewvc?rev=1077524&view=rev
Log:
commit e6b333b6972d4fb5d8c799d41e34878afac06c49
Author: Vinay Kumar Thota <vi...@yahoo-inc.com>
Date:   Fri Jul 2 11:24:34 2010 +0000

    MAPREDUCE:1794 from https://issues.apache.org/jira/secure/attachment/12448288/1794-ydist-security.patch

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestLostTaskTracker.java

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestLostTaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestLostTaskTracker.java?rev=1077524&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestLostTaskTracker.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestLostTaskTracker.java Fri Mar  4 04:23:56 2011
@@ -0,0 +1,257 @@
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.JTClient; 
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Hashtable;
+
+public class TestLostTaskTracker {
+  private static final Log LOG = LogFactory
+      .getLog(TestLostTaskTracker.class);
+  private static MRCluster cluster;
+  private static Configuration conf = new Configuration();
+  private static Path inputDir = new Path("input");
+  private static Path outputDir = new Path("output");
+  private static String confFile = "mapred-site.xml";
+  private JTProtocol wovenClient = null;
+  private JobID jID = null;
+  private JobInfo jInfo = null;
+  private JTClient jtClient = null;
+
+  @BeforeClass
+  public static void before() throws Exception {
+    String [] expExcludeList = {"java.net.ConnectException",
+        "java.io.IOException"};
+    cluster = MRCluster.createCluster(conf);
+    cluster.setExcludeExpList(expExcludeList);
+    cluster.setUp();
+    Hashtable<String,Long> prop = new Hashtable<String,Long>();
+    prop.put("mapred.tasktracker.expiry.interval",30000L);
+    cluster.restartClusterWithNewConfig(prop, confFile);
+    UtilsForTests.waitFor(1000);
+    conf = cluster.getJTClient().getProxy().getDaemonConf();
+    createInput(inputDir, conf);
+  }
+
+  @AfterClass
+  public static void after() throws Exception {
+    cleanup(inputDir, conf);
+    cleanup(outputDir, conf);
+    cluster.tearDown();
+    cluster.restart();
+  }
+  /**
+   * Verify the job status whether it is succeed or not when 
+   * lost task tracker is alive before the timeout.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Test
+  public void testJobStatusOfLostTaskTracker1() throws
+      Exception{
+    String testName = "LTT1";
+    setupJobAndRun();
+    JobStatus jStatus = verifyLostTaskTrackerJobStatus(testName);    
+    Assert.assertEquals("Job has not been succeeded...", 
+         JobStatus.SUCCEEDED, jStatus.getRunState());
+  }
+  
+  /**
+   * Verify the job status whether it is succeeded or not when 
+   * the lost task trackers time out for all four attempts of a task. 
+   * @throws IOException if an I/O error occurs.
+   */
+  @Test
+  public void testJobStatusOfLostTracker2()  throws 
+      Exception {
+    String testName = "LTT2";
+    setupJobAndRun();
+    JobStatus jStatus = verifyLostTaskTrackerJobStatus(testName);
+    Assert.assertEquals("Job has not been failed...", 
+            JobStatus.SUCCEEDED, jStatus.getRunState());
+  }
+
+  private void setupJobAndRun() throws IOException { 
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    conf = job.setupJobConf(3, 1, 60000, 100, 60000, 100);
+    JobConf jobConf = new JobConf(conf);
+    cleanup(outputDir, conf);
+    jtClient = cluster.getJTClient();
+    JobClient client = jtClient.getClient();
+    wovenClient = cluster.getJTClient().getProxy();
+    RunningJob runJob = client.submitJob(jobConf);
+    jID = runJob.getID();
+    jInfo = wovenClient.getJobInfo(jID);
+    Assert.assertNotNull("Job information is null",jInfo);
+    Assert.assertTrue("Job has not been started for 1 min.", 
+        jtClient.isJobStarted(jID));
+    JobStatus jobStatus = jInfo.getStatus();
+    // Make sure that job should run and completes 40%. 
+    while (jobStatus.getRunState() != JobStatus.RUNNING && 
+      jobStatus.mapProgress() < 0.4f) {
+      UtilsForTests.waitFor(100);
+      jobStatus = wovenClient.getJobInfo(jID).getStatus();
+    }
+  }
+  
+  private JobStatus verifyLostTaskTrackerJobStatus(String testName) 
+      throws IOException{
+    TaskInfo taskInfo = null;
+    TaskID tID = null;
+    String[] taskTrackers = null;
+    TaskInfo[] taskInfos = wovenClient.getTaskInfo(jID);
+    for (TaskInfo taskinfo : taskInfos) {
+      if (!taskinfo.isSetupOrCleanup()) {
+        taskInfo = taskinfo;
+        break;
+      }
+    }
+    Assert.assertTrue("Task has not been started for 1 min.",
+            jtClient.isTaskStarted(taskInfo));
+    tID = TaskID.downgrade(taskInfo.getTaskID());
+    TTClient ttClient = getTTClientIns(taskInfo);
+    int counter = 0;
+    while (counter < 30) {
+      if (ttClient != null) {
+        break;
+      }else{
+         taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());  
+         ttClient = getTTClientIns(taskInfo); 
+      }
+      counter ++;
+    }
+    Assert.assertNotNull("TaskTracker has not been found",ttClient);
+    if (testName.equals("LTT1")) {
+        ttClient.kill();
+        waitForTTStop(ttClient);
+        UtilsForTests.waitFor(20000);
+        ttClient.start();
+        waitForTTStart(ttClient);
+    } else {
+       int index = 0 ;
+       while(index++ < 4 ) {
+           ttClient.kill();
+           waitForTTStop(ttClient);
+           UtilsForTests.waitFor(40000);
+           ttClient.start();
+           waitForTTStart(ttClient);
+           taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+           ttClient = getTTClientIns(taskInfo);
+           counter = 0;
+           while (counter < 30) {
+             if (ttClient != null) {
+               break;
+             }else{
+                taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());  
+                ttClient = getTTClientIns(taskInfo); 
+             }
+             counter ++;
+           }
+           Assert.assertNotNull("TaskTracker has not been found",ttClient);
+           LOG.info("Task killed attempts:" + 
+               taskInfo.numKilledAttempts());
+       }
+       Assert.assertEquals("Task killed attempts are not matched ",
+           4, taskInfo.numKilledAttempts());
+    }
+    LOG.info("Waiting till the job is completed...");
+    while (!jInfo.getStatus().isJobComplete()) {
+      UtilsForTests.waitFor(1000);
+      jInfo = wovenClient.getJobInfo(jID);
+    }
+    return jInfo.getStatus();
+  }
+
+  private TTClient getTTClientIns(TaskInfo taskInfo) throws IOException{
+    String [] taskTrackers = taskInfo.getTaskTrackers();
+    int counter = 0;
+    TTClient ttClient = null;
+    while (counter < 60) {
+      if (taskTrackers.length != 0) {
+        break;
+      }
+      UtilsForTests.waitFor(100);
+      taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());
+      taskTrackers = taskInfo.getTaskTrackers();
+      counter ++;
+    }
+    if ( taskTrackers.length != 0) {
+      String hostName = taskTrackers[0].split("_")[1];
+      hostName = hostName.split(":")[0];
+      ttClient = cluster.getTTClient(hostName);
+    }
+    return ttClient;
+  }
+  private void waitForTTStart(TTClient ttClient) throws 
+     IOException {
+    LOG.debug(ttClient.getHostName() + " is waiting to come up.");
+    while (true) { 
+      try {
+        ttClient.ping();
+        LOG.info("TaskTracker : " + ttClient.getHostName() + " is pinging...");
+        break;
+      } catch (Exception exp) {
+        LOG.debug(ttClient.getHostName() + " is waiting to come up.");
+        UtilsForTests.waitFor(10000);
+      }
+    }
+  }
+  
+  private void waitForTTStop(TTClient ttClient) throws 
+     IOException {
+    LOG.info("Waiting for Tasktracker:" + ttClient.getHostName() 
+        + " to stop.....");
+    while (true) {
+      try {
+        ttClient.ping();
+        LOG.debug(ttClient.getHostName() +" is waiting state to stop.");
+        UtilsForTests.waitFor(10000);
+      } catch (Exception exp) {
+        LOG.info("TaskTracker : " + ttClient.getHostName() + " is stopped...");
+        break;
+      } 
+    }
+  }
+  
+  private static void cleanup(Path dir, Configuration conf) throws 
+      IOException {
+    FileSystem fs = dir.getFileSystem(conf);
+    fs.delete(dir, true);
+  }
+
+  private static void createInput(Path inDir, Configuration conf) throws 
+      IOException {
+    String input = "Hadoop is framework for data intensive distributed " 
+        + "applications.\nHadoop enables applications to" 
+        + " work with thousands of nodes.";
+    FileSystem fs = inDir.getFileSystem(conf);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Failed to create the input directory:" 
+          + inDir.toString());
+    }
+    fs.setPermission(inDir, new FsPermission(FsAction.ALL, 
+        FsAction.ALL, FsAction.ALL));
+    DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
+    file.writeBytes(input);
+    file.close();
+  }
+}