You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2015/10/14 19:25:17 UTC
spark git commit: [SPARK-11040] [NETWORK] Make sure SASL handler
delegates all events.
Repository: spark
Updated Branches:
refs/heads/master 135a2ce5b -> 31f315981
[SPARK-11040] [NETWORK] Make sure SASL handler delegates all events.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #9053 from vanzin/SPARK-11040.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31f31598
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31f31598
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31f31598
Branch: refs/heads/master
Commit: 31f315981709251d5d26c508a3dc62cf0e6f87e1
Parents: 135a2ce
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Oct 14 10:25:09 2015 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Oct 14 10:25:09 2015 -0700
----------------------------------------------------------------------
.../spark/network/sasl/SaslRpcHandler.java | 13 +++++++++++--
.../network/server/TransportRequestHandler.java | 8 +++++++-
.../spark/network/sasl/SparkSaslSuite.java | 19 +++++++++++++++++++
3 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/31f31598/network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java b/network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java
index 3f2ebe3..7033adb 100644
--- a/network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java
@@ -115,9 +115,18 @@ class SaslRpcHandler extends RpcHandler {
@Override
public void connectionTerminated(TransportClient client) {
- if (saslServer != null) {
- saslServer.dispose();
+ try {
+ delegate.connectionTerminated(client);
+ } finally {
+ if (saslServer != null) {
+ saslServer.dispose();
+ }
}
}
+ @Override
+ public void exceptionCaught(Throwable cause, TransportClient client) {
+ delegate.exceptionCaught(cause, client);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/31f31598/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 96941d2..9b8b047 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -76,7 +76,13 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
@Override
public void channelUnregistered() {
- streamManager.connectionTerminated(channel);
+ if (streamManager != null) {
+ try {
+ streamManager.connectionTerminated(channel);
+ } catch (RuntimeException e) {
+ logger.error("StreamManager connectionTerminated() callback failed.", e);
+ }
+ }
rpcHandler.connectionTerminated(reverseClient);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/31f31598/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index 8104004..3469e84 100644
--- a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -153,6 +153,8 @@ public class SparkSaslSuite {
assertEquals("Pong", new String(response, StandardCharsets.UTF_8));
} finally {
ctx.close();
+ // There should be 2 terminated events; one for the client, one for the server.
+ verify(rpcHandler, times(2)).connectionTerminated(any(TransportClient.class));
}
}
@@ -334,6 +336,23 @@ public class SparkSaslSuite {
}
}
+ @Test
+ public void testRpcHandlerDelegate() throws Exception {
+ // Tests all delegates exception for receive(), which is more complicated and already handled
+ // by all other tests.
+ RpcHandler handler = mock(RpcHandler.class);
+ RpcHandler saslHandler = new SaslRpcHandler(null, null, handler, null);
+
+ saslHandler.getStreamManager();
+ verify(handler).getStreamManager();
+
+ saslHandler.connectionTerminated(null);
+ verify(handler).connectionTerminated(any(TransportClient.class));
+
+ saslHandler.exceptionCaught(null, null);
+ verify(handler).exceptionCaught(any(Throwable.class), any(TransportClient.class));
+ }
+
private static class SaslTestCtx {
final TransportClient client;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org