You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lr...@apache.org on 2016/01/11 22:02:10 UTC

[28/50] [abbrv] incubator-toree git commit: Register contexts so that they can be closed when sockets close

Register contexts so that they can be closed when sockets close

Fixes #194


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/1398a638
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/1398a638
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/1398a638

Branch: refs/heads/master
Commit: 1398a638199cff1fd40b168e4fce11de3e942464
Parents: ecfa5dd
Author: Jakob Odersky <jo...@gmail.com>
Authored: Tue Dec 1 14:56:12 2015 -0800
Committer: Jakob Odersky <jo...@gmail.com>
Committed: Tue Dec 1 14:56:12 2015 -0800

----------------------------------------------------------------------
 .../ibm/spark/communication/SocketManager.scala | 40 +++++++++++++-------
 .../socket/ZeroMQSocketRunnable.scala           | 10 +++--
 2 files changed, 33 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/1398a638/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala b/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
index 3973c8d..994360f 100644
--- a/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
+++ b/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
@@ -38,6 +38,19 @@ class SocketManager {
     new ConcurrentHashMap[SocketLike, ZMQ.Context]().asScala
 
   /**
+   * Provides and registers a new ZMQ context, used for creating a new socket.
+   * @param mkSocket a function that creates a socket using a given context
+   * @return the new socket
+   * @see newZmqContext
+   */
+  private def withNewContext[A <: SocketLike](mkSocket: ZMQ.Context => A): A = {
+    val ctx = newZmqContext()
+    val socket = mkSocket(ctx)
+    socketToContextMap.put(socket, ctx)
+    socket
+  }
+
+  /**
    * Closes the socket provided and also closes the context if no more sockets
    * are using the context.
    *
@@ -45,6 +58,7 @@ class SocketManager {
    */
   def closeSocket(socket: SocketLike) = {
     socket.close()
+
     socketToContextMap.remove(socket).foreach(context => {
       if (!socketToContextMap.values.exists(_ == context)) context.close()
     })
@@ -61,9 +75,9 @@ class SocketManager {
   def newReqSocket(
     address: String,
     inboundMessageCallback: (Seq[String]) => Unit
-  ): SocketLike = {
-    new JeroMQSocket(new ReqSocketRunnable(
-      newZmqContext(),
+  ): SocketLike = withNewContext{ ctx =>
+     new JeroMQSocket(new ReqSocketRunnable(
+      ctx,
       Some(inboundMessageCallback),
       Connect(address),
       Linger(0)
@@ -81,9 +95,9 @@ class SocketManager {
   def newRepSocket(
     address: String,
     inboundMessageCallback: (Seq[String]) => Unit
-  ): SocketLike = {
+  ): SocketLike = withNewContext{ ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(
-      newZmqContext(),
+      ctx,
       RepSocket,
       Some(inboundMessageCallback),
       Bind(address),
@@ -100,9 +114,9 @@ class SocketManager {
    */
   def newPubSocket(
     address: String
-  ): SocketLike = {
+  ): SocketLike = withNewContext{ ctx =>
     new JeroMQSocket(new PubSocketRunnable(
-      newZmqContext(),
+      ctx,
       Bind(address),
       Linger(0)
     ))
@@ -119,9 +133,9 @@ class SocketManager {
   def newSubSocket(
     address: String,
     inboundMessageCallback: (Seq[String]) => Unit
-  ): SocketLike = {
+  ): SocketLike = withNewContext { ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(
-      newZmqContext(),
+      ctx,
       SubSocket,
       Some(inboundMessageCallback),
       Connect(address),
@@ -141,9 +155,9 @@ class SocketManager {
   def newRouterSocket(
     address: String,
     inboundMessageCallback: (Seq[String]) => Unit
-  ): SocketLike = {
+  ): SocketLike = withNewContext { ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(
-      newZmqContext(),
+      ctx,
       RouterSocket,
       Some(inboundMessageCallback),
       Bind(address),
@@ -163,9 +177,9 @@ class SocketManager {
     address: String,
     inboundMessageCallback: (Seq[String]) => Unit,
     identity: String = UUID.randomUUID().toString
-  ): SocketLike = {
+  ): SocketLike = withNewContext{ ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(
-      newZmqContext(),
+      ctx,
       DealerSocket,
       Some(inboundMessageCallback),
       Connect(address),

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/1398a638/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
index 0a3c900..6fee716 100644
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
+++ b/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
@@ -152,11 +152,13 @@ class ZeroMQSocketRunnable(
         Thread.sleep(1)
       }
     } catch {
-      case throwable: Throwable =>
-        logger.error("Unexpected exception in 0mq socket runnable!", throwable)
+      case ex: Exception =>
+        logger.error("Unexpected exception in 0mq socket runnable!", ex)
     } finally {
-      Try(socket.close()).failed.foreach {
-        case throwable: Throwable =>
+      try{
+        socket.close()
+      } catch {
+        case ex: Exception =>
           logger.error("Failed to close socket!", _: Throwable)
       }
     }