You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/01/27 06:14:41 UTC

spark git commit: [SPARK-12967][NETTY] Avoid NettyRpc error message during sparkContext shutdown

Repository: spark
Updated Branches:
  refs/heads/master 58f5d8c1d -> bae3c9a4e


[SPARK-12967][NETTY] Avoid NettyRpc error message during sparkContext shutdown

If there's an RPC issue while sparkContext is alive but stopped (which would happen only when executing SparkContext.stop), log a warning instead. This is a common occurrence.

vanzin

Author: Nishkam Ravi <ni...@gmail.com>
Author: nishkamravi2 <ni...@gmail.com>

Closes #10881 from nishkamravi2/master_netty.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bae3c9a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bae3c9a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bae3c9a4

Branch: refs/heads/master
Commit: bae3c9a4eb0c320999e5dbafd62692c12823e07d
Parents: 58f5d8c
Author: Nishkam Ravi <ni...@gmail.com>
Authored: Tue Jan 26 21:14:39 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Jan 26 21:14:39 2016 -0800

----------------------------------------------------------------------
 .../spark/rpc/RpcEnvStoppedException.scala      | 20 ++++++++++++++++++++
 .../org/apache/spark/rpc/netty/Dispatcher.scala |  4 ++--
 .../apache/spark/rpc/netty/NettyRpcEnv.scala    |  6 +++++-
 .../org/apache/spark/rpc/netty/Outbox.scala     |  7 +++++--
 4 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bae3c9a4/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala
new file mode 100644
index 0000000..c296cc2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnvStoppedException.scala
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.rpc
+
+private[rpc] class RpcEnvStoppedException()
+  extends IllegalStateException("RpcEnv already stopped.")

http://git-wip-us.apache.org/repos/asf/spark/blob/bae3c9a4/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index 19259e0..6ceff2c 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -106,7 +106,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
     val iter = endpoints.keySet().iterator()
     while (iter.hasNext) {
       val name = iter.next
-      postMessage(name, message, (e) => logWarning(s"Message $message dropped.", e))
+      postMessage(name, message, (e) => logWarning(s"Message $message dropped. ${e.getMessage}"))
     }
   }
 
@@ -156,7 +156,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
     if (shouldCallOnStop) {
       // We don't need to call `onStop` in the `synchronized` block
       val error = if (stopped) {
-          new IllegalStateException("RpcEnv already stopped.")
+          new RpcEnvStoppedException()
         } else {
           new SparkException(s"Could not find $endpointName or it has been stopped.")
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/bae3c9a4/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 9ae74d9..89eda85 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -182,7 +182,11 @@ private[netty] class NettyRpcEnv(
     val remoteAddr = message.receiver.address
     if (remoteAddr == address) {
       // Message to a local RPC endpoint.
-      dispatcher.postOneWayMessage(message)
+      try {
+        dispatcher.postOneWayMessage(message)
+      } catch {
+        case e: RpcEnvStoppedException => logWarning(e.getMessage)
+      }
     } else {
       // Message to a remote RPC endpoint.
       postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message)))

http://git-wip-us.apache.org/repos/asf/spark/blob/bae3c9a4/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
index 2316ebe..9fd64e8 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
@@ -25,7 +25,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
-import org.apache.spark.rpc.RpcAddress
+import org.apache.spark.rpc.{RpcAddress, RpcEnvStoppedException}
 
 private[netty] sealed trait OutboxMessage {
 
@@ -43,7 +43,10 @@ private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends Outbo
   }
 
   override def onFailure(e: Throwable): Unit = {
-    logWarning(s"Failed to send one-way RPC.", e)
+    e match {
+      case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
+      case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
+    }
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org