You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2017/07/17 18:14:07 UTC
spark git commit: [SPARK-21321][SPARK CORE] Spark very verbose on
shutdown
Repository: spark
Updated Branches:
refs/heads/master 7047f49f4 -> 0e07a29cf
[SPARK-21321][SPARK CORE] Spark very verbose on shutdown
## What changes were proposed in this pull request?
The current code is very verbose on shutdown.
The changes I propose is to change the log level when the driver is shutting down and the RPC connections are closed (RpcEnvStoppedException).
## How was this patch tested?
Tested with word count(deploy-mode = cluster, master = yarn, num-executors = 4) with 300GB of data.
Author: John Lee <jl...@yahoo-inc.com>
Closes #18547 from yoonlee95/SPARK-21321.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e07a29c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e07a29c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e07a29c
Branch: refs/heads/master
Commit: 0e07a29cf4a5587f939585e6885ed0f7e68c31b5
Parents: 7047f49
Author: John Lee <jl...@yahoo-inc.com>
Authored: Mon Jul 17 13:13:35 2017 -0500
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Mon Jul 17 13:13:35 2017 -0500
----------------------------------------------------------------------
.../scala/org/apache/spark/rpc/netty/Dispatcher.scala | 7 +++++--
.../main/scala/org/apache/spark/rpc/netty/Inbox.scala | 7 ++++++-
.../scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 7 +++++--
.../main/scala/org/apache/spark/rpc/netty/Outbox.scala | 2 +-
.../org/apache/spark/scheduler/LiveListenerBus.scala | 2 +-
.../spark/scheduler/cluster/YarnSchedulerBackend.scala | 11 +++++++++--
6 files changed, 27 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0e07a29c/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index a02cf30..e94babb 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -109,8 +109,11 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
val iter = endpoints.keySet().iterator()
while (iter.hasNext) {
val name = iter.next
- postMessage(name, message, (e) => logWarning(s"Message $message dropped. ${e.getMessage}"))
- }
+ postMessage(name, message, (e) => { e match {
+ case e: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${e.getMessage}")
+ case e: Throwable => logWarning(s"Message $message dropped. ${e.getMessage}")
+ }}
+ )}
}
/** Posts a message sent by a remote endpoint. */
http://git-wip-us.apache.org/repos/asf/spark/blob/0e07a29c/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
index ae4a600..d32eba6 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
@@ -205,7 +205,12 @@ private[netty] class Inbox(
try action catch {
case NonFatal(e) =>
try endpoint.onError(e) catch {
- case NonFatal(ee) => logError(s"Ignoring error", ee)
+ case NonFatal(ee) =>
+ if (stopped) {
+ logDebug("Ignoring error", ee)
+ } else {
+ logError("Ignoring error", ee)
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e07a29c/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index b316e54..6489849 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -185,7 +185,7 @@ private[netty] class NettyRpcEnv(
try {
dispatcher.postOneWayMessage(message)
} catch {
- case e: RpcEnvStoppedException => logWarning(e.getMessage)
+ case e: RpcEnvStoppedException => logDebug(e.getMessage)
}
} else {
// Message to a remote RPC endpoint.
@@ -203,7 +203,10 @@ private[netty] class NettyRpcEnv(
def onFailure(e: Throwable): Unit = {
if (!promise.tryFailure(e)) {
- logWarning(s"Ignored failure: $e")
+ e match {
+ case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
+ case _ => logWarning(s"Ignored failure: $e")
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e07a29c/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
index a7b7f58..b7e068a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
@@ -45,7 +45,7 @@ private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends Outbo
override def onFailure(e: Throwable): Unit = {
e match {
- case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
+ case e1: RpcEnvStoppedException => logDebug(e1.getMessage)
case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0e07a29c/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 0dd63d4..7d5e980 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -136,7 +136,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
def post(event: SparkListenerEvent): Unit = {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
- logError(s"$name has already stopped! Dropping event $event")
+ logDebug(s"$name has already stopped! Dropping event $event")
return
}
metrics.numEventsPosted.inc()
http://git-wip-us.apache.org/repos/asf/spark/blob/0e07a29c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index cbc6e60..8452f43 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler.cluster
+import java.util.concurrent.atomic.{AtomicBoolean}
+
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
@@ -40,6 +42,8 @@ private[spark] abstract class YarnSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
+ private val stopped = new AtomicBoolean(false)
+
override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
0.8
@@ -93,6 +97,7 @@ private[spark] abstract class YarnSchedulerBackend(
requestTotalExecutors(0, 0, Map.empty)
super.stop()
} finally {
+ stopped.set(true)
services.stop()
}
}
@@ -206,8 +211,10 @@ private[spark] abstract class YarnSchedulerBackend(
*/
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
- if (disableExecutor(executorId)) {
- yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+ if (!stopped.get) {
+ if (disableExecutor(executorId)) {
+ yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org