You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2020/01/16 21:38:07 UTC

[spark] branch master updated: [SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic allocation

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dca8380  [SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic allocation
dca8380 is described below

commit dca838058ffd0e2c01591fd9ab0f192de446d606
Author: Marcelo Vanzin <va...@cloudera.com>
AuthorDate: Thu Jan 16 13:37:11 2020 -0800

    [SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic allocation
    
    The issue here is that when Spark is downscaling the application and deletes
    a few pod requests that aren't needed anymore, it may actually race with the
    K8S scheduler, who may be bringing up those executors. So they may have enough
    time to connect back to the driver, register, to just be deleted soon after.
    This wastes resources and causes misleading entries in the driver log.
    
    The change (ab)uses the blacklisting mechanism to consider the deleted excess
    pods as blacklisted, so that if they try to connect back, the driver will deny
    it.
    
    It also changes the executor registration slightly, since even with the above
    change there were misleading logs. That was because the executor registration
    message was an RPC that always succeeded (bar network issues), so the executor
    would always try to send an unregistration message to the driver, which would
    then log several messages about not knowing anything about the executor. The
    change makes the registration RPC succeed or fail directly, instead of using
    the separate failure message that would lead to this issue.
    
    Note the last change required some changes in a standalone test suite related
    to dynamic allocation, since it relied on the driver not throwing exceptions
    when a duplicate executor registration happened.
    
    Tested with existing unit tests, and with live cluster with dyn alloc on.
    
    Closes #26586 from vanzin/SPARK-29950.
    
    Authored-by: Marcelo Vanzin <va...@cloudera.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../executor/CoarseGrainedExecutorBackend.scala    | 14 +++--
 .../cluster/CoarseGrainedClusterMessage.scala      |  7 ---
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 19 +++++--
 .../deploy/StandaloneDynamicAllocationSuite.scala  | 65 ++++++++++++++--------
 .../CoarseGrainedSchedulerBackendSuite.scala       |  1 +
 .../cluster/k8s/ExecutorPodsAllocator.scala        | 18 ++++++
 .../k8s/KubernetesClusterSchedulerBackend.scala    |  4 ++
 .../DeterministicExecutorPodsSnapshotsStore.scala  |  9 +++
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 11 ++++
 9 files changed, 105 insertions(+), 43 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index b1837c9..1fe901a 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -54,6 +54,8 @@ private[spark] class CoarseGrainedExecutorBackend(
     resourcesFileOpt: Option[String])
   extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
 
+  import CoarseGrainedExecutorBackend._
+
   private implicit val formats = DefaultFormats
 
   private[this] val stopping = new AtomicBoolean(false)
@@ -80,9 +82,8 @@ private[spark] class CoarseGrainedExecutorBackend(
       ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
         extractAttributes, resources))
     }(ThreadUtils.sameThread).onComplete {
-      // This is a very fast action so we can use "ThreadUtils.sameThread"
-      case Success(msg) =>
-        // Always receive `true`. Just ignore it
+      case Success(_) =>
+        self.send(RegisteredExecutor)
       case Failure(e) =>
         exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
     }(ThreadUtils.sameThread)
@@ -133,9 +134,6 @@ private[spark] class CoarseGrainedExecutorBackend(
           exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
       }
 
-    case RegisterExecutorFailed(message) =>
-      exitExecutor(1, "Slave registration failed: " + message)
-
     case LaunchTask(data) =>
       if (executor == null) {
         exitExecutor(1, "Received LaunchTask command but executor was null")
@@ -226,6 +224,10 @@ private[spark] class CoarseGrainedExecutorBackend(
 
 private[spark] object CoarseGrainedExecutorBackend extends Logging {
 
+  // Message used internally to start the executor when the driver successfully accepted the
+  // registration request.
+  case object RegisteredExecutor
+
   case class Arguments(
       driverUrl: String,
       executorId: String,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 9ce2368..57317e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -48,13 +48,6 @@ private[spark] object CoarseGrainedClusterMessages {
   case class KillExecutorsOnHost(host: String)
     extends CoarseGrainedClusterMessage
 
-  sealed trait RegisterExecutorResponse
-
-  case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse
-
-  case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
-    with RegisterExecutorResponse
-
   case class UpdateDelegationTokens(tokens: Array[Byte])
     extends CoarseGrainedClusterMessage
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7c7d8c2..031b9af 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -207,15 +207,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
           attributes, resources) =>
         if (executorDataMap.contains(executorId)) {
-          executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
-          context.reply(true)
-        } else if (scheduler.nodeBlacklist.contains(hostname)) {
+          context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
+        } else if (scheduler.nodeBlacklist.contains(hostname) ||
+            isBlacklisted(executorId, hostname)) {
           // If the cluster manager gives us an executor on a blacklisted node (because it
           // already started allocating those resources before we informed it of our blacklist,
           // or if it ignored our blacklist), then we reject that executor immediately.
           logInfo(s"Rejecting $executorId as it has been blacklisted.")
-          executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
-          context.reply(true)
+          context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))
         } else {
           // If the executor's rpc env is not listening for incoming connections, `hostPort`
           // will be null, and the client connection should be used to contact the executor.
@@ -250,7 +249,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
               logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
             }
           }
-          executorRef.send(RegisteredExecutor)
           // Note: some tests expect the reply to come after we put the executor in the map
           context.reply(true)
           listenerBus.post(
@@ -776,6 +774,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()
 
+  /**
+   * Checks whether the executor is blacklisted. This is called when the executor tries to
+   * register with the scheduler, and will deny registration if this method returns true.
+   *
+   * This is in addition to the blacklist kept by the task scheduler, so custom implementations
+   * don't need to check there.
+   */
+  protected def isBlacklisted(executorId: String, hostname: String): Boolean = false
+
   // SPARK-27112: We need to ensure that there is ordering of lock acquisition
   // between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix
   // the deadlock issue exposed in SPARK-27112
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index dd790b8..e316da7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
-import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.ApplicationInfo
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.Worker
@@ -34,7 +34,7 @@ import org.apache.spark.internal.config
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster._
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor, RegisterExecutorFailed}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor}
 
 /**
  * End-to-end tests for dynamic allocation in standalone mode.
@@ -482,12 +482,16 @@ class StandaloneDynamicAllocationSuite
       assert(apps.head.getExecutorLimit === Int.MaxValue)
     }
     val beforeList = getApplications().head.executors.keys.toSet
-    assert(killExecutorsOnHost(sc, "localhost").equals(true))
-
     syncExecutors(sc)
-    val afterList = getApplications().head.executors.keys.toSet
+
+    sc.schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend =>
+        b.killExecutorsOnHost("localhost")
+      case _ => fail("expected coarse grained scheduler")
+    }
 
     eventually(timeout(10.seconds), interval(100.millis)) {
+      val afterList = getApplications().head.executors.keys.toSet
       assert(beforeList.intersect(afterList).size == 0)
     }
   }
@@ -514,10 +518,11 @@ class StandaloneDynamicAllocationSuite
       val scheduler = new CoarseGrainedSchedulerBackend(taskScheduler, rpcEnv)
       try {
         scheduler.start()
-        scheduler.driverEndpoint.ask[Boolean](message)
-        eventually(timeout(10.seconds), interval(100.millis)) {
-          verify(endpointRef).send(RegisterExecutorFailed(any()))
+        val e = intercept[SparkException] {
+          scheduler.driverEndpoint.askSync[Boolean](message)
         }
+        assert(e.getCause().isInstanceOf[IllegalStateException])
+        assert(scheduler.getExecutorIds().isEmpty)
       } finally {
         scheduler.stop()
       }
@@ -536,6 +541,11 @@ class StandaloneDynamicAllocationSuite
       .setMaster(masterRpcEnv.address.toSparkURL)
       .setAppName("test")
       .set(config.EXECUTOR_MEMORY.key, "256m")
+      // Because we're faking executor launches in the Worker, set the config so that the driver
+      // will not timeout anything related to executors.
+      .set(config.Network.NETWORK_TIMEOUT.key, "2h")
+      .set(config.EXECUTOR_HEARTBEAT_INTERVAL.key, "1h")
+      .set(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key, "1h")
   }
 
   /** Make a master to which our application will send executor requests. */
@@ -549,8 +559,7 @@ class StandaloneDynamicAllocationSuite
   private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
     (0 until numWorkers).map { i =>
       val rpcEnv = workerRpcEnvs(i)
-      val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
-        Worker.ENDPOINT_NAME, null, conf, securityManager)
+      val worker = new TestWorker(rpcEnv, cores, memory)
       rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
       worker
     }
@@ -588,16 +597,6 @@ class StandaloneDynamicAllocationSuite
     }
   }
 
-  /** Kill the executors on a given host. */
-  private def killExecutorsOnHost(sc: SparkContext, host: String): Boolean = {
-    syncExecutors(sc)
-    sc.schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend =>
-        b.killExecutorsOnHost(host)
-      case _ => fail("expected coarse grained scheduler")
-    }
-  }
-
   /**
    * Return a list of executor IDs belonging to this application.
    *
@@ -620,9 +619,8 @@ class StandaloneDynamicAllocationSuite
    * we submit a request to kill them. This must be called before each kill request.
    */
   private def syncExecutors(sc: SparkContext): Unit = {
-    val driverExecutors = sc.env.blockManager.master.getStorageStatus
-      .map(_.blockManagerId.executorId)
-      .filter { _ != SparkContext.DRIVER_IDENTIFIER}
+    val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
+    val driverExecutors = backend.getExecutorIds()
     val masterExecutors = getExecutorIds(sc)
     val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted
     missingExecutors.foreach { id =>
@@ -632,10 +630,29 @@ class StandaloneDynamicAllocationSuite
       when(endpointRef.address).thenReturn(mockAddress)
       val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty,
         Map.empty)
-      val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
       backend.driverEndpoint.askSync[Boolean](message)
       backend.driverEndpoint.send(LaunchedExecutor(id))
     }
   }
 
+  /**
+   * Worker implementation that does not actually launch any executors, but reports them as
+   * running so the Master keeps track of them. This requires that `syncExecutors` be used
+   * to make sure the Master instance and the SparkContext under test agree about what
+   * executors are running.
+   */
+  private class TestWorker(rpcEnv: RpcEnv, cores: Int, memory: Int)
+    extends Worker(
+      rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), Worker.ENDPOINT_NAME,
+      null, conf, securityManager) {
+
+    override def receive: PartialFunction[Any, Unit] = testReceive.orElse(super.receive)
+
+    private def testReceive: PartialFunction[Any, Unit] = synchronized {
+      case LaunchExecutor(_, appId, execId, _, _, _, _) =>
+        self.send(ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None))
+    }
+
+  }
+
 }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index f916f63..29160a3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -189,6 +189,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     val conf = new SparkConf()
       .set(EXECUTOR_CORES, 1)
       .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test
+      .set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor registrations
       .setMaster(
       "coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
       .setAppName("test")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 2201bf9..b394f35 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -72,6 +72,11 @@ private[spark] class ExecutorPodsAllocator(
 
   private var lastSnapshot = ExecutorPodsSnapshot(Nil)
 
+  // Executors that have been deleted by this allocator but not yet detected as deleted in
+  // a snapshot from the API server. This is used to deny registration from these executors
+  // if they happen to come up before the deletion takes effect.
+  @volatile private var deletedExecutorIds = Set.empty[Long]
+
   def start(applicationId: String): Unit = {
     snapshotsStore.addSubscriber(podAllocationDelay) {
       onNewSnapshots(applicationId, _)
@@ -85,6 +90,8 @@ private[spark] class ExecutorPodsAllocator(
     }
   }
 
+  def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong)
+
   private def onNewSnapshots(
       applicationId: String,
       snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized {
@@ -141,10 +148,17 @@ private[spark] class ExecutorPodsAllocator(
       }
       .map { case (id, _) => id }
 
+    // Make a local, non-volatile copy of the reference since it's used multiple times. This
+    // is the only method that modifies the list, so this is safe.
+    var _deletedExecutorIds = deletedExecutorIds
+
     if (snapshots.nonEmpty) {
       logDebug(s"Pod allocation status: $currentRunningCount running, " +
         s"${currentPendingExecutors.size} pending, " +
         s"${newlyCreatedExecutors.size} unacknowledged.")
+
+      val existingExecs = lastSnapshot.executorPods.keySet
+      _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
     }
 
     val currentTotalExpectedExecutors = totalExpectedExecutors.get
@@ -169,6 +183,8 @@ private[spark] class ExecutorPodsAllocator(
 
       if (toDelete.nonEmpty) {
         logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
+        _deletedExecutorIds = _deletedExecutorIds ++ toDelete
+
         Utils.tryLogNonFatalError {
           kubernetesClient
             .pods()
@@ -209,6 +225,8 @@ private[spark] class ExecutorPodsAllocator(
       }
     }
 
+    deletedExecutorIds = _deletedExecutorIds
+
     // Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
     // update method when not needed.
     hasPendingPods.set(knownPendingCount + newlyCreatedExecutors.size > 0)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index e221a92..105841a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -181,6 +181,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
     Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint))
   }
 
+  override protected def isBlacklisted(executorId: String, hostname: String): Boolean = {
+    podAllocator.isDeleted(executorId)
+  }
+
   private class KubernetesDriverEndpoint extends DriverEndpoint {
 
     override def onDisconnected(rpcAddress: RpcAddress): Unit = {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
index 1b6dfe5..9ac7e02 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
@@ -48,4 +48,13 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore
     currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
     snapshotsBuffer += currentSnapshot
   }
+
+  def removeDeletedExecutors(): Unit = {
+    val nonDeleted = currentSnapshot.executorPods.filter {
+      case (_, PodDeleted(_)) => false
+      case _ => true
+    }
+    currentSnapshot = ExecutorPodsSnapshot(nonDeleted)
+    snapshotsBuffer += currentSnapshot
+  }
 }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 4475d5d..a0abded 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -189,6 +189,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     verify(podOperations, times(4)).create(any())
     verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
     verify(podOperations).delete()
+    assert(podsAllocatorUnderTest.isDeleted("3"))
+    assert(podsAllocatorUnderTest.isDeleted("4"))
+
+    // Update the snapshot to not contain the deleted executors, make sure the
+    // allocator cleans up internal state.
+    snapshotsStore.updatePod(deletedExecutor(3))
+    snapshotsStore.updatePod(deletedExecutor(4))
+    snapshotsStore.removeDeletedExecutors()
+    snapshotsStore.notifySubscribers()
+    assert(!podsAllocatorUnderTest.isDeleted("3"))
+    assert(!podsAllocatorUnderTest.isDeleted("4"))
   }
 
   private def executorPodAnswer(): Answer[SparkPod] =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org