You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/09 19:55:29 UTC

[11/50] [abbrv] hadoop git commit: YARN-7622. Allow fair-scheduler configuration on HDFS (gphillips via rkanter)

YARN-7622. Allow fair-scheduler configuration on HDFS (gphillips via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7a550448
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7a550448
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7a550448

Branch: refs/heads/YARN-6592
Commit: 7a550448036c9d140d2c35c684cc8023ceb8880e
Parents: 3ba9859
Author: Robert Kanter <rk...@apache.org>
Authored: Wed Jan 3 15:31:50 2018 -0800
Committer: Robert Kanter <rk...@apache.org>
Committed: Wed Jan 3 15:31:50 2018 -0800

----------------------------------------------------------------------
 .../fair/AllocationFileLoaderService.java       | 104 +++++++++++--------
 .../fair/TestAllocationFileLoaderService.java   |  92 ++++++++++++----
 2 files changed, 133 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a550448/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index 597af94..f73e05f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -17,25 +17,15 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -45,8 +35,8 @@ import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.security.Permission;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -57,7 +47,17 @@ import org.w3c.dom.NodeList;
 import org.w3c.dom.Text;
 import org.xml.sax.SAXException;
 
-import com.google.common.annotations.VisibleForTesting;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 @Public
 @Unstable
@@ -77,6 +77,9 @@ public class AllocationFileLoaderService extends AbstractService {
 
   public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
+  //Permitted allocation file filesystems (case insensitive)
+  private static final String SUPPORTED_FS_REGEX =
+      "(?i)(hdfs)|(file)|(s3a)|(viewfs)";
   private static final String ROOT = "root";
   private static final AccessControlList EVERYBODY_ACL =
       new AccessControlList("*");
@@ -85,12 +88,14 @@ public class AllocationFileLoaderService extends AbstractService {
 
   private final Clock clock;
 
-  private long lastSuccessfulReload; // Last time we successfully reloaded queues
-  private boolean lastReloadAttemptFailed = false;
-  
-  // Path to XML file containing allocations. 
-  private File allocFile;
-  
+  // Last time we successfully reloaded queues
+  private volatile long lastSuccessfulReload;
+  private volatile boolean lastReloadAttemptFailed = false;
+
+  // Path to XML file containing allocations.
+  private Path allocFile;
+  private FileSystem fs;
+
   private Listener reloadListener;
   
   @VisibleForTesting
@@ -108,19 +113,19 @@ public class AllocationFileLoaderService extends AbstractService {
   public AllocationFileLoaderService(Clock clock) {
     super(AllocationFileLoaderService.class.getName());
     this.clock = clock;
-    
   }
   
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     this.allocFile = getAllocationFile(conf);
-    if (allocFile != null) {
-      reloadThread = new Thread() {
-        @Override
-        public void run() {
-          while (running) {
+    if(this.allocFile != null) {
+      this.fs = allocFile.getFileSystem(conf);
+      reloadThread = new Thread(() -> {
+        while (running) {
+          try {
             long time = clock.getTime();
-            long lastModified = allocFile.lastModified();
+            long lastModified =
+                fs.getFileStatus(allocFile).getModificationTime();
             if (lastModified > lastSuccessfulReload &&
                 time > lastModified + ALLOC_RELOAD_WAIT_MS) {
               try {
@@ -136,19 +141,21 @@ public class AllocationFileLoaderService extends AbstractService {
               if (!lastReloadAttemptFailed) {
                 LOG.warn("Failed to reload fair scheduler config file because" +
                     " last modified returned 0. File exists: "
-                    + allocFile.exists());
+                    + fs.exists(allocFile));
               }
               lastReloadAttemptFailed = true;
             }
-            try {
-              Thread.sleep(reloadIntervalMs);
-            } catch (InterruptedException ex) {
-              LOG.info(
-                  "Interrupted while waiting to reload alloc configuration");
-            }
+          } catch (IOException e) {
+            LOG.info("Exception while loading allocation file: " + e);
+          }
+          try {
+            Thread.sleep(reloadIntervalMs);
+          } catch (InterruptedException ex) {
+            LOG.info(
+                "Interrupted while waiting to reload alloc configuration");
           }
         }
-      };
+      });
       reloadThread.setName("AllocationFileReloader");
       reloadThread.setDaemon(true);
     }
@@ -182,24 +189,31 @@ public class AllocationFileLoaderService extends AbstractService {
    * path is relative, it is searched for in the
    * classpath, but loaded like a regular File.
    */
-  public File getAllocationFile(Configuration conf) {
+  public Path getAllocationFile(Configuration conf)
+      throws UnsupportedFileSystemException {
     String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE,
         FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE);
-    File allocFile = new File(allocFilePath);
-    if (!allocFile.isAbsolute()) {
+    Path allocPath = new Path(allocFilePath);
+    String allocPathScheme = allocPath.toUri().getScheme();
+    if(allocPathScheme != null && !allocPathScheme.matches(SUPPORTED_FS_REGEX)){
+      throw new UnsupportedFileSystemException("Allocation file "
+          + allocFilePath + " uses an unsupported filesystem");
+    } else if (!allocPath.isAbsolute()) {
       URL url = Thread.currentThread().getContextClassLoader()
           .getResource(allocFilePath);
       if (url == null) {
         LOG.warn(allocFilePath + " not found on the classpath.");
-        allocFile = null;
+        allocPath = null;
       } else if (!url.getProtocol().equalsIgnoreCase("file")) {
         throw new RuntimeException("Allocation file " + url
             + " found on the classpath is not on the local filesystem.");
       } else {
-        allocFile = new File(url.getPath());
+        allocPath = new Path(url.getProtocol(), null, url.getPath());
       }
+    } else if (allocPath.isAbsoluteAndSchemeAuthorityNull()){
+      allocPath = new Path("file", null, allocFilePath);
     }
-    return allocFile;
+    return allocPath;
   }
   
   public synchronized void setReloadListener(Listener reloadListener) {
@@ -274,7 +288,7 @@ public class AllocationFileLoaderService extends AbstractService {
       DocumentBuilderFactory.newInstance();
     docBuilderFactory.setIgnoringComments(true);
     DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-    Document doc = builder.parse(allocFile);
+    Document doc = builder.parse(fs.open(allocFile));
     Element root = doc.getDocumentElement();
     if (!"allocations".equals(root.getTagName()))
       throw new AllocationConfigurationException("Bad fair scheduler config " +
@@ -437,7 +451,7 @@ public class AllocationFileLoaderService extends AbstractService {
           fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
           reservationAcls, newPlacementPolicy, configuredQueues,
           globalReservationQueueConfig, reservableQueues, nonPreemptableQueues);
-    
+
     lastSuccessfulReload = clock.getTime();
     lastReloadAttemptFailed = false;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a550448/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
index 67b46f9..c46ecd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
@@ -17,17 +17,12 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
@@ -38,6 +33,23 @@ import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class TestAllocationFileLoaderService {
   
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -45,16 +57,60 @@ public class TestAllocationFileLoaderService {
 
   final static String ALLOC_FILE = new File(TEST_DIR,
       "test-queues").getAbsolutePath();
-  
+  private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml";
+
   @Test
-  public void testGetAllocationFileFromClasspath() {
-    Configuration conf = new Configuration();
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
-        "test-fair-scheduler.xml");
+  public void testGetAllocationFileFromFileSystem()
+      throws IOException, URISyntaxException {
+    Configuration conf = new YarnConfiguration();
+    File baseDir =
+        new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile();
+    FileUtil.fullyDelete(baseDir);
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+    MiniDFSCluster hdfsCluster = builder.build();
+    String fsAllocPath = "hdfs://localhost:" + hdfsCluster.getNameNodePort()
+        + Path.SEPARATOR + TEST_FAIRSCHED_XML;
+
+    URL fschedURL = Thread.currentThread().getContextClassLoader()
+        .getResource(TEST_FAIRSCHED_XML);
+    FileSystem fs = FileSystem.get(conf);
+    fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
+
     AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
-    File allocationFile = allocLoader.getAllocationFile(conf);
-    assertEquals("test-fair-scheduler.xml", allocationFile.getName());
-    assertTrue(allocationFile.exists());
+    Path allocationFile = allocLoader.getAllocationFile(conf);
+    assertEquals(fsAllocPath, allocationFile.toString());
+    assertTrue(fs.exists(allocationFile));
+
+    hdfsCluster.shutdown(true);
+  }
+
+  @Test (expected = UnsupportedFileSystemException.class)
+  public void testDenyGetAllocationFileFromUnsupportedFileSystem()
+      throws UnsupportedFileSystemException {
+    Configuration conf = new YarnConfiguration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
+    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
+
+    allocLoader.getAllocationFile(conf);
+  }
+
+  @Test
+  public void testGetAllocationFileFromClasspath() {
+    try {
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.get(conf);
+      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+          TEST_FAIRSCHED_XML);
+      AllocationFileLoaderService allocLoader =
+          new AllocationFileLoaderService();
+      Path allocationFile = allocLoader.getAllocationFile(conf);
+      assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
+      assertTrue(fs.exists(allocationFile));
+    } catch (IOException e) {
+      fail("Unable to access allocation file from classpath: " + e);
+    }
   }
   
   @Test (timeout = 10000)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org