You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lr...@apache.org on 2016/01/11 22:02:10 UTC
[28/50] [abbrv] incubator-toree git commit: Register contexts so that
they can be closed when sockets close
Register contexts so that they can be closed when sockets close
Fixes #194
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/1398a638
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/1398a638
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/1398a638
Branch: refs/heads/master
Commit: 1398a638199cff1fd40b168e4fce11de3e942464
Parents: ecfa5dd
Author: Jakob Odersky <jo...@gmail.com>
Authored: Tue Dec 1 14:56:12 2015 -0800
Committer: Jakob Odersky <jo...@gmail.com>
Committed: Tue Dec 1 14:56:12 2015 -0800
----------------------------------------------------------------------
.../ibm/spark/communication/SocketManager.scala | 40 +++++++++++++-------
.../socket/ZeroMQSocketRunnable.scala | 10 +++--
2 files changed, 33 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/1398a638/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala b/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
index 3973c8d..994360f 100644
--- a/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
+++ b/communication/src/main/scala/com/ibm/spark/communication/SocketManager.scala
@@ -38,6 +38,19 @@ class SocketManager {
new ConcurrentHashMap[SocketLike, ZMQ.Context]().asScala
/**
+ * Provides and registers a new ZMQ context, used for creating a new socket.
+ * @param mkSocket a function that creates a socket using a given context
+ * @return the new socket
+ * @see newZmqContext
+ */
+ private def withNewContext[A <: SocketLike](mkSocket: ZMQ.Context => A): A = {
+ val ctx = newZmqContext()
+ val socket = mkSocket(ctx)
+ socketToContextMap.put(socket, ctx)
+ socket
+ }
+
+ /**
* Closes the socket provided and also closes the context if no more sockets
* are using the context.
*
@@ -45,6 +58,7 @@ class SocketManager {
*/
def closeSocket(socket: SocketLike) = {
socket.close()
+
socketToContextMap.remove(socket).foreach(context => {
if (!socketToContextMap.values.exists(_ == context)) context.close()
})
@@ -61,9 +75,9 @@ class SocketManager {
def newReqSocket(
address: String,
inboundMessageCallback: (Seq[String]) => Unit
- ): SocketLike = {
- new JeroMQSocket(new ReqSocketRunnable(
- newZmqContext(),
+ ): SocketLike = withNewContext{ ctx =>
+ new JeroMQSocket(new ReqSocketRunnable(
+ ctx,
Some(inboundMessageCallback),
Connect(address),
Linger(0)
@@ -81,9 +95,9 @@ class SocketManager {
def newRepSocket(
address: String,
inboundMessageCallback: (Seq[String]) => Unit
- ): SocketLike = {
+ ): SocketLike = withNewContext{ ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
- newZmqContext(),
+ ctx,
RepSocket,
Some(inboundMessageCallback),
Bind(address),
@@ -100,9 +114,9 @@ class SocketManager {
*/
def newPubSocket(
address: String
- ): SocketLike = {
+ ): SocketLike = withNewContext{ ctx =>
new JeroMQSocket(new PubSocketRunnable(
- newZmqContext(),
+ ctx,
Bind(address),
Linger(0)
))
@@ -119,9 +133,9 @@ class SocketManager {
def newSubSocket(
address: String,
inboundMessageCallback: (Seq[String]) => Unit
- ): SocketLike = {
+ ): SocketLike = withNewContext { ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
- newZmqContext(),
+ ctx,
SubSocket,
Some(inboundMessageCallback),
Connect(address),
@@ -141,9 +155,9 @@ class SocketManager {
def newRouterSocket(
address: String,
inboundMessageCallback: (Seq[String]) => Unit
- ): SocketLike = {
+ ): SocketLike = withNewContext { ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
- newZmqContext(),
+ ctx,
RouterSocket,
Some(inboundMessageCallback),
Bind(address),
@@ -163,9 +177,9 @@ class SocketManager {
address: String,
inboundMessageCallback: (Seq[String]) => Unit,
identity: String = UUID.randomUUID().toString
- ): SocketLike = {
+ ): SocketLike = withNewContext{ ctx =>
new JeroMQSocket(new ZeroMQSocketRunnable(
- newZmqContext(),
+ ctx,
DealerSocket,
Some(inboundMessageCallback),
Connect(address),
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/1398a638/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala b/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
index 0a3c900..6fee716 100644
--- a/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
+++ b/communication/src/main/scala/com/ibm/spark/communication/socket/ZeroMQSocketRunnable.scala
@@ -152,11 +152,13 @@ class ZeroMQSocketRunnable(
Thread.sleep(1)
}
} catch {
- case throwable: Throwable =>
- logger.error("Unexpected exception in 0mq socket runnable!", throwable)
+ case ex: Exception =>
+ logger.error("Unexpected exception in 0mq socket runnable!", ex)
} finally {
- Try(socket.close()).failed.foreach {
- case throwable: Throwable =>
+ try{
+ socket.close()
+ } catch {
+ case ex: Exception =>
logger.error("Failed to close socket!", _: Throwable)
}
}