You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2011/10/25 06:16:01 UTC

svn commit: r1188510 - in /hadoop/common/branches/branch-0.20-security: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: ravigummadi
Date: Tue Oct 25 04:16:01 2011
New Revision: 1188510

URL: http://svn.apache.org/viewvc?rev=1188510&view=rev
Log:
MAPREDUCE-2850. Add test for MAPREDUCE-2413. Contributed by Ravi Gummadi

Added:
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestDiskFailures.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1188510&r1=1188509&r2=1188510&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Tue Oct 25 04:16:01 2011
@@ -45,6 +45,8 @@ Release 0.20.206.0 - unreleased
 
     MAPREDUCE-2957. The TT should not re-init if it has no good local dirs. (eli)
 
+    MAPREDUCE-2850. Add test for MAPREDUCE-2413. (ravigummadi)
+
 Release 0.20.205.1 - unreleased
 
   IMPROVEMENTS

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1188510&r1=1188509&r2=1188510&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Oct 25 04:16:01 2011
@@ -36,6 +36,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -207,13 +208,15 @@ public class TaskTracker implements MRCo
      * @throws DiskErrorException if no directories are writable
      */
     synchronized void checkDirs() throws DiskErrorException {
-      for (String dir : localDirs) {
+      ListIterator<String> it = localDirs.listIterator();
+      while (it.hasNext()) {
+        final String dir = it.next();
         try {
           DiskChecker.checkDir(new File(dir));
         } catch (DiskErrorException de) {
           LOG.warn("TaskTracker local dir " + dir + " error " + 
               de.getMessage() + ", removing from local dirs");
-          localDirs.remove(dir);
+          it.remove();
           numFailures++;
         }
       }

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java?rev=1188510&r1=1188509&r2=1188510&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java Tue Oct 25 04:16:01 2011
@@ -57,7 +57,8 @@ public abstract class ClusterMapReduceTe
   }
 
   /**
-   * Starts the cluster within a testcase.
+   * Starts the cluster within a testcase with single mapred-local-dir per
+   * TaskTracker.
    * <p/>
    * Note that the cluster is already started when the testcase method
    * is invoked. This method is useful if as part of the testcase the
@@ -69,8 +70,28 @@ public abstract class ClusterMapReduceTe
    * @param props configuration properties to inject to the mini cluster
    * @throws Exception if the cluster could not be started
    */
-  protected synchronized void startCluster(boolean reformatDFS, Properties props)
-          throws Exception {
+  protected synchronized void startCluster(boolean reformatDFS,
+      Properties props) throws Exception {
+    startCluster(reformatDFS, props, 1);
+  }
+
+  /**
+   * Starts the cluster within a testcase with the given number of
+   * mapred-local-dirs per TaskTracker.
+   * <p/>
+   * Note that the cluster is already started when the testcase method
+   * is invoked. This method is useful if as part of the testcase the
+   * cluster has to be shutdown and restarted again.
+   * <p/>
+   * If the cluster is already running this method does nothing.
+   * @param reformatDFS indicates if DFS has to be reformated
+   * @param props configuration properties to inject to the mini cluster
+   * @param numDir 
+   * @throws Exception if the cluster could not be started
+   */
+  protected synchronized void startCluster(boolean reformatDFS,
+        Properties props, int numDir) throws Exception {
+      
     if (dfsCluster == null) {
       JobConf conf = new JobConf();
       if (props != null) {
@@ -83,7 +104,7 @@ public abstract class ClusterMapReduceTe
       ConfigurableMiniMRCluster.setConfiguration(props);
       //noinspection deprecation
       mrCluster = new ConfigurableMiniMRCluster(2, getFileSystem().getName(),
-                                                1, conf);
+                                                numDir, conf);
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1188510&r1=1188509&r2=1188510&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Oct 25 04:16:01 2011
@@ -216,12 +216,13 @@ public class MiniMRCluster {
           tt.run();
         }
       } catch (Throwable e) {
-        isDead = true;
         tt = null;
         LOG.error("task tracker " + trackerId + " crashed", e);
       }
+      // TaskTracker finished execution unexpectedly. So marking it as dead.
+      isDead = true;
     }
-        
+ 
     /**
      * Get the local dir for this TaskTracker.
      * This is there so that we do not break

Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestDiskFailures.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestDiskFailures.java?rev=1188510&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestDiskFailures.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestDiskFailures.java Tue Oct 25 04:16:01 2011
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Verify if TaskTracker's in-memory good mapred local dirs list gets updated
+ * properly when disks fail.
+ */
+public class TestDiskFailures extends ClusterMapReduceTestCase {
+
+  private static final Log LOG = LogFactory.getLog(TestDiskFailures.class);
+
+  private static String localPathRoot = System.getProperty(
+      "test.build.data", "/tmp").replace(' ', '+');
+  private String DISK_HEALTH_CHECK_INTERVAL = "1000";//1 sec
+
+  @Override
+  protected void setUp() throws Exception {
+    // Do not start cluster here
+  };
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    FileUtil.fullyDelete(new File(localPathRoot));
+  };
+
+  /**
+   * Make some of the the mapred-local-dirs fail/inaccessible and verify if
+   * TaskTracker gets reinited properly.
+   * @throws Exception
+   */
+  public void testDiskFailures() throws Exception {
+
+    FileSystem fs = FileSystem.get(new Configuration());
+    Path dir = new Path(localPathRoot, "mapred_local_dirs_base");
+    FileSystem.mkdirs(fs, dir, new FsPermission((short)0777));
+
+    Properties props = new Properties();
+    props.setProperty(JobConf.MAPRED_LOCAL_DIR_PROPERTY, dir.toUri().getPath());
+    // set disk health check interval to a small value (say 4 sec).
+    props.setProperty(TaskTracker.DISK_HEALTH_CHECK_INTERVAL_PROPERTY,
+        DISK_HEALTH_CHECK_INTERVAL);
+
+    // Let us have 4 mapred-local-dirs per tracker
+    final int numMapredLocalDirs = 4;
+    startCluster(true, props, numMapredLocalDirs);
+
+    MiniMRCluster cluster = getMRCluster();
+    String[] localDirs = cluster.getTaskTrackerLocalDirs(0);
+
+    // Make 1 disk fail and verify if TaskTracker gets re-inited or not and
+    // the good mapred local dirs list gets updated properly in TaskTracker.
+    prepareDirToFail(localDirs[2]);
+    String expectedMapredLocalDirs = localDirs[0] + "," + localDirs[1] + ","
+                                     + localDirs[3];
+    verifyReinitTaskTrackerAfterDiskFailure(expectedMapredLocalDirs, cluster);
+    
+    // Make 2 more disks fail and verify if TaskTracker gets re-inited or not
+    // and the good mapred local dirs list gets updated properly in TaskTracker.
+    prepareDirToFail(localDirs[0]);
+    prepareDirToFail(localDirs[3]);
+    expectedMapredLocalDirs = localDirs[1];
+    verifyReinitTaskTrackerAfterDiskFailure(expectedMapredLocalDirs, cluster);
+    
+    // Fail the remaining single disk(i.e. the remaining good mapred-local-dir).
+    prepareDirToFail(localDirs[1]);
+    waitForDiskHealthCheck();
+    assertTrue(
+        "Tasktracker is not dead even though all mapred local dirs became bad.",
+        cluster.getTaskTrackerRunner(0).isDead);
+  }
+
+  /**
+   * Wait for the TaskTracker to go for the disk-health-check and (possibly)
+   * reinit.
+   * DiskHealthCheckInterval is 1 sec. So this wait time should be greater than
+   * [1 sec + TT_reinit_execution_time]. Let us have this as 4sec.
+   */
+  private void waitForDiskHealthCheck() {
+    try {
+      Thread.sleep(4000);
+    } catch(InterruptedException e) {
+      LOG.error("Interrupted while waiting for TaskTracker reinit.");
+    }
+  }
+
+  /**
+   * Verify if TaskTracker gets reinited properly after disk failure.
+   * @param expectedMapredLocalDirs expected mapred local dirs
+   * @param cluster MiniMRCluster in which 1st TaskTracker is supposed to get
+   *                reinited because of disk failure
+   * @throws IOException
+   */
+  private void verifyReinitTaskTrackerAfterDiskFailure(
+      String expectedMapredLocalDirs, MiniMRCluster cluster)
+      throws IOException {
+    // Wait for the TaskTracker to get reinited. DiskHealthCheckInterval is
+    // 1 sec. So this wait time should be > [1 sec + TT_reinit_execution_time].
+    waitForDiskHealthCheck();
+    String[] updatedLocalDirs = cluster.getTaskTrackerRunner(0)
+        .getTaskTracker().getJobConf().getLocalDirs();
+    String seenMapredLocalDirs = StringUtils.arrayToString(updatedLocalDirs);
+    LOG.info("ExpectedMapredLocalDirs=" + expectedMapredLocalDirs);
+    assertTrue("TaskTracker could not reinit properly after disk failure.",
+        expectedMapredLocalDirs.equals(seenMapredLocalDirs));    
+  }
+
+  /**
+   * Prepare directory for a failure. Replace the given directory on the
+   * local FileSystem with a regular file with the same name.
+   * This would cause failure of creation of directory in DiskChecker.checkDir()
+   * with the same name.
+   * @throws IOException 
+   */
+  private void prepareDirToFail(String dir)
+      throws IOException {
+    File file = new File(dir);
+    FileUtil.fullyDelete(file);
+    file.createNewFile();
+  }
+}