You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/08/02 12:27:52 UTC

[GitHub] [spark] Ngone51 commented on a change in pull request #25047: [WIP][SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone

Ngone51 commented on a change in pull request #25047: [WIP][SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone
URL: https://github.com/apache/spark/pull/25047#discussion_r310108076
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
 ##########
 @@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.{File, RandomAccessFile}
+import java.nio.channels.{FileLock, OverlappingFileLockException}
+import java.nio.file.Files
+
+import scala.collection.mutable
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import org.json4s.{DefaultFormats, Extraction}
+import org.json4s.jackson.JsonMethods.{compact, parse, render}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.SPARK_RESOURCES_DIR
+import org.apache.spark.resource.{ResourceAllocation, ResourceID, ResourceInformation, ResourceRequirement}
+import org.apache.spark.resource.ResourceUtils.withResourcesJson
+import org.apache.spark.util.Utils
+
+private[spark] object StandaloneResourceUtils extends Logging {
+  // These directory/files are used to coordinate the resources between
+  // the drivers/workers on the host in Spark Standalone.
+  val SPARK_RESOURCES_COORDINATE_DIR = "spark-resources"
+  val ALLOCATED_RESOURCES_FILE = "__allocated_resources__.json"
+  val RESOURCES_LOCK_FILE = "__allocated_resources__.lock"
+
+  /**
+   * Resource allocation used in Standalone only, which tracks assignments with
+   * worker/driver(client only) pid.
+   */
+  case class StandaloneResourceAllocation(pid: Int, allocations: Seq[ResourceAllocation]) {
+    // convert allocations to a resource information map
+    def toResourceInformationMap: Map[String, ResourceInformation] = {
+      allocations.map { allocation =>
+        allocation.id.resourceName -> allocation.toResourceInformation
+      }.toMap
+    }
+  }
+
+  /**
+   * Assign resources to workers/drivers from the same host to avoid address conflict.
+   *
+   * This function works in three steps. First, acquiring the lock on RESOURCES_LOCK_FILE
+   * to achieve synchronization among workers and drivers. Second, getting all allocated
+   * resources from ALLOCATED_RESOURCES_FILE and assigning isolated resources to the worker
+   * or driver after differentiating available resources in discovered resources from
+   * allocated resources. If available resources don't meet worker's or driver's requirement,
+   * try to update allocated resources by excluding the resource allocation if its related
+   * process has already terminated and do the assignment again. If still don't meet requirement,
+   * exception should be thrown. Third, updating ALLOCATED_RESOURCES_FILE with new allocated
+   * resources along with pid for the worker or driver. Then, return allocated resources
+   * information after releasing the lock.
+   *
+   * @param conf SparkConf
+   * @param componentName spark.driver / spark.worker
+   * @param resources the resources found by worker/driver on the host
+   * @param resourceRequirements the resource requirements asked by the worker/driver
+   * @param pid the process id of worker/driver to acquire resources.
+   * @return allocated resources for the worker/driver or throws exception if can't
+   *         meet worker/driver's requirement
+   */
+  def acquireResources(
+      conf: SparkConf,
+      componentName: String,
+      resources: Map[String, ResourceInformation],
+      resourceRequirements: Seq[ResourceRequirement],
+      pid: Int)
+    : Map[String, ResourceInformation] = {
+    if (resourceRequirements.isEmpty) {
+      return Map.empty
+    }
+    val lock = acquireLock(conf)
+    try {
+      val resourcesFile = new File(getOrCreateResourcesDir(conf), ALLOCATED_RESOURCES_FILE)
+      // all allocated resources in ALLOCATED_RESOURCES_FILE, can be updated if any allocations'
+      // related processes detected to be terminated while checking pids below.
+      var origAllocation = Seq.empty[StandaloneResourceAllocation]
+      // Map[pid -> Map[resourceName -> Addresses[]]]
+      var allocated = {
+        if (resourcesFile.exists()) {
+          origAllocation = allocatedStandaloneResources(resourcesFile.getPath)
+          val allocations = origAllocation.map { resource =>
+            val resourceMap = {
+              resource.allocations.map { allocation =>
+                allocation.id.resourceName -> allocation.addresses.toArray
+              }.toMap
+            }
+            resource.pid -> resourceMap
+          }.toMap
+          allocations
+        } else {
+          Map.empty[Int, Map[String, Array[String]]]
+        }
+      }
+
+      // new allocated resources for worker or driver,
+      // map from resource name to its allocated addresses.
+      var newAssignments: Map[String, Array[String]] = null
+      // Whether we've checked process status and we'll only do the check at most once.
+      // Do the check iff the available resources can't meet the requirements at the first time.
+      var checked = false
+      // Whether we need to keep allocating for the worker/driver and we'll only go through
+      // the loop at most twice.
+      var keepAllocating = true
+      while (keepAllocating) {
+        keepAllocating = false
+        // store the pid whose related allocated resources conflict with
+        // discovered resources passed in.
+        val pidsToCheck = mutable.Set[Int]()
+        newAssignments = resourceRequirements.map { req =>
+          val rName = req.resourceName
+          val amount = req.amount
+          // initially, we must have available.length >= amount as we've done pre-check previously
+          var available = resources(rName).addresses
+          // gets available resource addresses by excluding all
+          // allocated resource addresses from discovered resources
+          allocated.foreach { a =>
+            val thePid = a._1
+            val resourceMap = a._2
+            val assigned = resourceMap.getOrElse(rName, Array.empty)
+            val retained = available.diff(assigned)
+            // if len(retained) < len(available) after differ to assigned, then, there must be
+            // some conflicting resources addresses between available and assigned. So, we should
+            // store its pid here to check whether it's alive in case we don't find enough
+            // resources after traversal all allocated resources.
+            if (retained.length < available.length && !checked) {
+              pidsToCheck += thePid
+            }
+            if (retained.length >= amount) {
+              available = retained
+            } else if (checked) {
+              keepAllocating = false
+              throw new SparkException(s"No more resources available since they've already" +
+                s" assigned to other workers/drivers.")
+            } else {
+              keepAllocating = true
+            }
+          }
+          val assigned = {
+            if (keepAllocating) { // can't meet the requirement
+              // excludes the allocation whose related process has already been terminated.
+              val (invalid, valid) = allocated.partition { a =>
+                pidsToCheck(a._1) && !(Utils.isTesting || Utils.isProcessRunning(a._1))}
+              allocated = valid
+              origAllocation = origAllocation.filter(
+                allocation => !invalid.contains(allocation.pid))
+              checked = true
+              // note this is a meaningless return value, just to avoid creating any new object
+              available
+            } else {
+              available.take(amount)
+            }
+          }
+          rName -> assigned
+        }.toMap
+      }
+      val newAllocation = {
+        val allocations = newAssignments.map { case (rName, addresses) =>
+          ResourceAllocation(ResourceID(componentName, rName), addresses)
+        }.toSeq
+        StandaloneResourceAllocation(pid, allocations)
+      }
+      writeResourceAllocationJson(
+        componentName, origAllocation ++ Seq(newAllocation), resourcesFile)
+      newAllocation.toResourceInformationMap
+    } finally {
+      releaseLock(lock)
+    }
+  }
+
+  /**
+   * Free the indicated resources to make those resources be available for other
+   * workers/drivers on the same host.
+   * @param conf SparkConf
+   * @param componentName spark.driver / spark.worker
+   * @param toRelease the resources expected to release
+   * @param pid the process id of worker/driver to release resources.
+   */
+  def releaseResources(
+      conf: SparkConf,
+      componentName: String,
+      toRelease: Map[String, ResourceInformation],
+      pid: Int)
+    : Unit = {
+    if (toRelease != null && toRelease.nonEmpty) {
+      val lock = acquireLock(conf)
+      try {
+        val resourcesFile = new File(getOrCreateResourcesDir(conf), ALLOCATED_RESOURCES_FILE)
+        if (resourcesFile.exists()) {
+          val (target, others) =
+            allocatedStandaloneResources(resourcesFile.getPath).partition(_.pid == pid)
+          if (target.nonEmpty) {
+            val rNameToAddresses = {
+              target.head.allocations.map { allocation =>
+                allocation.id.resourceName -> allocation.addresses
+              }.toMap
+            }
+            val allocations = {
+              rNameToAddresses.map { case (rName, addresses) =>
+                val retained = addresses.diff(Option(toRelease.getOrElse(rName, null))
+                  .map(_.addresses).getOrElse(Array.empty))
+                rName -> retained
+              }
+                .filter(_._2.nonEmpty)
+                .map { case (rName, addresses) =>
+                  ResourceAllocation(ResourceID(componentName, rName), addresses)
+                }.toSeq
+            }
+            if (allocations.nonEmpty) {
+              val newAllocation = StandaloneResourceAllocation(pid, allocations)
+              writeResourceAllocationJson(
+                componentName, others ++ Seq(newAllocation), resourcesFile)
+            } else {
+              if (others.isEmpty) {
+                if (!resourcesFile.delete()) {
+                  logWarning(s"Failed to delete $ALLOCATED_RESOURCES_FILE.")
 
 Review comment:
   We don't delete the resourcesFile while it still has the allocations. And this is more likely the IO error if fails to delete. And it would be OK if we failed to delete the file because now we use pid to detect not valid allocations.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org