You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/04/21 20:51:06 UTC
spark git commit: [SPARK-14699][CORE] Stop endpoints before closing
the connections and don't stop client in Outbox
Repository: spark
Updated Branches:
refs/heads/master 3a21e8d5e -> e4904d870
[SPARK-14699][CORE] Stop endpoints before closing the connections and don't stop client in Outbox
## What changes were proposed in this pull request?
In general, `onDisconnected` is for dealing with unexpected network disconnections. When RpcEnv.shutdown is called, the disconnections are expected so RpcEnv should not fire these events.
This PR moves `dispatcher.stop()` above closing the connections so that when stopping RpcEnv, the endpoints won't receive `onDisconnected` events.
In addition, Outbox should not close the client since it will be reused by others. This PR fixes it as well.
## How was this patch tested?
test("SPARK-14699: RpcEnv.shutdown should not fire onDisconnected events")
Author: Shixiong Zhu <sh...@databricks.com>
Closes #12481 from zsxwing/SPARK-14699.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4904d87
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4904d87
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4904d87
Branch: refs/heads/master
Commit: e4904d870a0e705a3a7d370320e6f8a5f23d5944
Parents: 3a21e8d
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Thu Apr 21 11:51:04 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Apr 21 11:51:04 2016 -0700
----------------------------------------------------------------------
.../apache/spark/rpc/netty/NettyRpcEnv.scala | 6 ++---
.../org/apache/spark/rpc/netty/Outbox.scala | 5 +---
.../org/apache/spark/rpc/RpcEnvSuite.scala | 28 +++++++++++++++++++-
3 files changed, 31 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e4904d87/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 7f2192e..7d7b4c8 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -287,15 +287,15 @@ private[netty] class NettyRpcEnv(
if (timeoutScheduler != null) {
timeoutScheduler.shutdownNow()
}
+ if (dispatcher != null) {
+ dispatcher.stop()
+ }
if (server != null) {
server.close()
}
if (clientFactory != null) {
clientFactory.close()
}
- if (dispatcher != null) {
- dispatcher.stop()
- }
if (clientConnectionExecutor != null) {
clientConnectionExecutor.shutdownNow()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e4904d87/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
index 56499c6..6c090ad 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
@@ -241,10 +241,7 @@ private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {
}
private def closeClient(): Unit = synchronized {
- // Not sure if `client.close` is idempotent. Just for safety.
- if (client != null) {
- client.close()
- }
+ // Just set client to null. Don't close it in order to reuse the connection.
client = null
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e4904d87/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 73803ec..505cd47 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -29,7 +29,8 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import com.google.common.io.Files
-import org.mockito.Mockito.{mock, when}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually._
@@ -844,6 +845,31 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}
+ test("SPARK-14699: RpcEnv.shutdown should not fire onDisconnected events") {
+ env.setupEndpoint("SPARK-14699", new RpcEndpoint {
+ override val rpcEnv: RpcEnv = env
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case m => context.reply(m)
+ }
+ })
+
+ val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0)
+ val endpoint = mock(classOf[RpcEndpoint])
+ anotherEnv.setupEndpoint("SPARK-14699", endpoint)
+
+ val ref = anotherEnv.setupEndpointRef(env.address, "SPARK-14699")
+ // Make sure the connect is set up
+ assert(ref.askWithRetry[String]("hello") === "hello")
+ anotherEnv.shutdown()
+ anotherEnv.awaitTermination()
+
+ env.stop(ref)
+
+ verify(endpoint).onStop()
+ verify(endpoint, never()).onDisconnected(any())
+ verify(endpoint, never()).onNetworkError(any(), any())
+ }
}
class UnserializableClass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org