You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2015/10/14 19:25:17 UTC

spark git commit: [SPARK-11040] [NETWORK] Make sure SASL handler delegates all events.

Repository: spark
Updated Branches:
  refs/heads/master 135a2ce5b -> 31f315981


[SPARK-11040] [NETWORK] Make sure SASL handler delegates all events.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #9053 from vanzin/SPARK-11040.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31f31598
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31f31598
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31f31598

Branch: refs/heads/master
Commit: 31f315981709251d5d26c508a3dc62cf0e6f87e1
Parents: 135a2ce
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Oct 14 10:25:09 2015 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Oct 14 10:25:09 2015 -0700

----------------------------------------------------------------------
 .../spark/network/sasl/SaslRpcHandler.java       | 13 +++++++++++--
 .../network/server/TransportRequestHandler.java  |  8 +++++++-
 .../spark/network/sasl/SparkSaslSuite.java       | 19 +++++++++++++++++++
 3 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/31f31598/network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java b/network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java
index 3f2ebe3..7033adb 100644
--- a/network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java
@@ -115,9 +115,18 @@ class SaslRpcHandler extends RpcHandler {
 
   @Override
   public void connectionTerminated(TransportClient client) {
-    if (saslServer != null) {
-      saslServer.dispose();
+    try {
+      delegate.connectionTerminated(client);
+    } finally {
+      if (saslServer != null) {
+        saslServer.dispose();
+      }
     }
   }
 
+  @Override
+  public void exceptionCaught(Throwable cause, TransportClient client) {
+    delegate.exceptionCaught(cause, client);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/31f31598/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 96941d2..9b8b047 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -76,7 +76,13 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
 
   @Override
   public void channelUnregistered() {
-    streamManager.connectionTerminated(channel);
+    if (streamManager != null) {
+      try {
+        streamManager.connectionTerminated(channel);
+      } catch (RuntimeException e) {
+        logger.error("StreamManager connectionTerminated() callback failed.", e);
+      }
+    }
     rpcHandler.connectionTerminated(reverseClient);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/31f31598/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index 8104004..3469e84 100644
--- a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -153,6 +153,8 @@ public class SparkSaslSuite {
       assertEquals("Pong", new String(response, StandardCharsets.UTF_8));
     } finally {
       ctx.close();
+      // There should be 2 terminated events; one for the client, one for the server.
+      verify(rpcHandler, times(2)).connectionTerminated(any(TransportClient.class));
     }
   }
 
@@ -334,6 +336,23 @@ public class SparkSaslSuite {
     }
   }
 
+  @Test
+  public void testRpcHandlerDelegate() throws Exception {
+    // Tests all delegates exception for receive(), which is more complicated and already handled
+    // by all other tests.
+    RpcHandler handler = mock(RpcHandler.class);
+    RpcHandler saslHandler = new SaslRpcHandler(null, null, handler, null);
+
+    saslHandler.getStreamManager();
+    verify(handler).getStreamManager();
+
+    saslHandler.connectionTerminated(null);
+    verify(handler).connectionTerminated(any(TransportClient.class));
+
+    saslHandler.exceptionCaught(null, null);
+    verify(handler).exceptionCaught(any(Throwable.class), any(TransportClient.class));
+  }
+
   private static class SaslTestCtx {
 
     final TransportClient client;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org