You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/10/14 01:32:30 UTC

git commit: YARN-2566. DefaultContainerExecutor should pick a working directory randomly. (Zhihai Xu via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk da709a2ea -> cc93e7e68


YARN-2566. DefaultContainerExecutor should pick a working directory randomly. (Zhihai Xu via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cc93e7e6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cc93e7e6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cc93e7e6

Branch: refs/heads/trunk
Commit: cc93e7e683fa74eb1a7aa2b357a36667bd21086a
Parents: da709a2
Author: Karthik Kambatla <ka...@apache.org>
Authored: Mon Oct 13 16:32:01 2014 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Mon Oct 13 16:32:01 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/FileContext.java  |   2 +-
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   4 +-
 .../src/main/resources/yarn-default.xml         |   2 +-
 .../nodemanager/DefaultContainerExecutor.java   |  59 ++++++++-
 .../TestDefaultContainerExecutor.java           | 119 +++++++++++++++++--
 6 files changed, 173 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc93e7e6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
index d878d17..2323650 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
@@ -161,7 +161,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
-public final class FileContext {
+public class FileContext {
   
   public static final Log LOG = LogFactory.getLog(FileContext.class);
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc93e7e6/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e74553c..5cadc49 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -643,6 +643,9 @@ Release 2.6.0 - UNRELEASED
     queue to which the apps were submitted is changed across RM restart.
     (Craig Welch & Chang Li via jianhe)
 
+    YARN-2566. DefaultContainerExecutor should pick a working directory randomly. 
+    (Zhihai Xu via kasha)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc93e7e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 450fd5b..1e7fce6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -840,10 +840,10 @@ public class YarnConfiguration extends Configuration {
   public static final String NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
       NM_DISK_HEALTH_CHECK_PREFIX + "max-disk-utilization-per-disk-percentage";
   /**
-   * By default, 100% of the disk can be used before it is marked as offline.
+   * By default, 90% of the disk can be used before it is marked as offline.
    */
   public static final float DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE =
-      100.0F;
+      90.0F;
 
   /**
    * The minimum space that must be available on a local dir for it to be used.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc93e7e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 0c07337..bba4263 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -948,7 +948,7 @@
     for full disk. This applies to yarn-nodemanager.local-dirs and 
     yarn.nodemanager.log-dirs.</description>
     <name>yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage</name>
-    <value>100.0</value>
+    <value>90.0</value>
   </property>
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc93e7e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index 23d2e72..5d1e3df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -103,8 +104,8 @@ public class DefaultContainerExecutor extends ContainerExecutor {
     createAppDirs(localDirs, user, appId);
     createAppLogDirs(appId, logDirs, user);
 
-    // TODO: Why pick first app dir. The same in LCE why not random?
-    Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
+    // randomly choose the local directory
+    Path appStorageDir = getWorkingDir(localDirs, user, appId);
 
     String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
     Path tokenDst = new Path(appStorageDir, tokenFn);
@@ -466,6 +467,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
     return getApplicationDir(new Path(localDirs.get(0)), user, appId);
   }
 
+  private long getDiskFreeSpace(Path base) throws IOException {
+    return lfs.getFsStatus(base).getRemaining();
+  }
+
   private Path getApplicationDir(Path base, String user, String appId) {
     return new Path(getAppcacheDir(base, user), appId);
   }
@@ -484,6 +489,56 @@ public class DefaultContainerExecutor extends ContainerExecutor {
         ContainerLocalizer.FILECACHE);
   }
 
+  private Path getWorkingDir(List<String> localDirs, String user,
+      String appId) throws IOException {
+    Path appStorageDir = null;
+    long totalAvailable = 0L;
+    long[] availableOnDisk = new long[localDirs.size()];
+    int i = 0;
+    // randomly choose the app directory
+    // the chance of picking a directory is proportional to
+    // the available space on the directory.
+    // firstly calculate the sum of all available space on these directories
+    for (String localDir : localDirs) {
+      Path curBase = getApplicationDir(new Path(localDir),
+          user, appId);
+      long space = 0L;
+      try {
+        space = getDiskFreeSpace(curBase);
+      } catch (IOException e) {
+        LOG.warn("Unable to get Free Space for " + curBase.toString(), e);
+      }
+      availableOnDisk[i++] = space;
+      totalAvailable += space;
+    }
+
+    // throw an IOException if totalAvailable is 0.
+    if (totalAvailable <= 0L) {
+      throw new IOException("Not able to find a working directory for "
+          + user);
+    }
+
+    // make probability to pick a directory proportional to
+    // the available space on the directory.
+    Random r = new Random();
+    long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
+    int dir = 0;
+    // skip zero available space directory,
+    // because totalAvailable is greater than 0 and randomPosition
+    // is less than totalAvailable, we can find a valid directory
+    // with nonzero available space.
+    while (availableOnDisk[dir] == 0L) {
+      dir++;
+    }
+    while (randomPosition > availableOnDisk[dir]) {
+      randomPosition -= availableOnDisk[dir++];
+    }
+    appStorageDir = getApplicationDir(new Path(localDirs.get(dir)),
+        user, appId);
+
+    return appStorageDir;
+  }
+
   protected void createDir(Path dirPath, FsPermission perms,
       boolean createParent, String user) throws IOException {
     lfs.mkdir(dirPath, perms, createParent);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc93e7e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
index f6f0e9f..7db74f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.junit.Assert.assertTrue;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
@@ -41,14 +42,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
@@ -57,20 +50,30 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.Options.CreateOpts;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
 
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.After;
-import org.junit.Assert;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -296,6 +299,102 @@ public class TestDefaultContainerExecutor {
     }
   }
 
+  @Test(timeout = 30000)
+  public void testStartLocalizer()
+      throws IOException, InterruptedException {
+    InetSocketAddress localizationServerAddress;
+    final Path firstDir = new Path(BASE_TMP_PATH, "localDir1");
+    List<String> localDirs = new ArrayList<String>();
+    final Path secondDir = new Path(BASE_TMP_PATH, "localDir2");
+    List<String> logDirs = new ArrayList<String>();
+    final Path logDir = new Path(BASE_TMP_PATH, "logDir");
+    final Path tokenDir = new Path(BASE_TMP_PATH, "tokenDir");
+    FsPermission perms = new FsPermission((short)0770);
+
+    Configuration conf = new Configuration();
+    localizationServerAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_LOCALIZER_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
+
+    final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf));
+    final FileContext.Util mockUtil = spy(mockLfs.util());
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocationOnMock)
+          throws Throwable {
+        return mockUtil;
+      }
+    }).when(mockLfs).util();
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocationOnMock)
+          throws Throwable {
+        Path dest = (Path) invocationOnMock.getArguments()[1];
+        if (dest.toString().contains(firstDir.toString())) {
+          // throw an Exception when copy token to the first local dir
+          // to simulate no space on the first drive
+          throw new IOException("No space on this drive " +
+              dest.toString());
+        } else {
+          // copy token to the second local dir
+          DataOutputStream tokenOut = null;
+          try {
+            Credentials credentials = new Credentials();
+            tokenOut = mockLfs.create(dest,
+                EnumSet.of(CREATE, OVERWRITE));
+            credentials.writeTokenStorageToStream(tokenOut);
+          } finally {
+            if (tokenOut != null) {
+              tokenOut.close();
+            }
+          }
+        }
+        return null;
+      }
+    }).when(mockUtil).copy(any(Path.class), any(Path.class));
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocationOnMock)
+          throws Throwable {
+        Path p = (Path) invocationOnMock.getArguments()[0];
+        // let second local directory return more free space than
+        // first local directory
+        if (p.toString().contains(firstDir.toString())) {
+          return new FsStatus(2000, 2000, 0);
+        } else {
+          return new FsStatus(1000, 0, 1000);
+        }
+      }
+    }).when(mockLfs).getFsStatus(any(Path.class));
+
+    DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(
+        mockLfs));
+    mockExec.setConf(conf);
+    localDirs.add(mockLfs.makeQualified(firstDir).toString());
+    localDirs.add(mockLfs.makeQualified(secondDir).toString());
+    logDirs.add(mockLfs.makeQualified(logDir).toString());
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS,
+        localDirs.toArray(new String[localDirs.size()]));
+    conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString());
+    mockLfs.mkdir(tokenDir, perms, true);
+    Path nmPrivateCTokensPath = new Path(tokenDir, "test.tokens");
+    String appSubmitter = "nobody";
+    String appId = "APP_ID";
+    String locId = "LOC_ID";
+    try {
+      mockExec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
+          appSubmitter, appId, locId, localDirs, logDirs);
+    } catch (IOException e) {
+      Assert.fail("StartLocalizer failed to copy token file " + e);
+    } finally {
+      mockExec.deleteAsUser(appSubmitter, firstDir);
+      mockExec.deleteAsUser(appSubmitter, secondDir);
+      mockExec.deleteAsUser(appSubmitter, logDir);
+      deleteTmpFiles();
+    }
+  }
 //  @Test
 //  public void testInit() throws IOException, InterruptedException {
 //    Configuration conf = new Configuration();