You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/06/29 03:06:55 UTC

spark git commit: Revert "[SPARK-21094][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak"

Repository: spark
Updated Branches:
  refs/heads/master db44f5f3e -> fc92d25f2


Revert "[SPARK-21094][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak"

This reverts commit 6b3d02285ee0debc73cbcab01b10398a498fbeb8.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc92d25f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc92d25f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc92d25f

Branch: refs/heads/master
Commit: fc92d25f2a27e81ef2d5031dcf856af1cc1d8c31
Parents: db44f5f
Author: Felix Cheung <fe...@apache.org>
Authored: Wed Jun 28 20:06:29 2017 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Wed Jun 28 20:06:29 2017 -0700

----------------------------------------------------------------------
 R/pkg/inst/worker/daemon.R | 59 +++--------------------------------------
 1 file changed, 4 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fc92d25f/R/pkg/inst/worker/daemon.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R
index 6e385b2..3a318b7 100644
--- a/R/pkg/inst/worker/daemon.R
+++ b/R/pkg/inst/worker/daemon.R
@@ -30,55 +30,8 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
     port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
-# Waits indefinitely for a socket connecion by default.
-selectTimeout <- NULL
-
-# Exit code that children send to the parent to indicate they exited.
-exitCode <- 1
-
 while (TRUE) {
-  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
-
-  # Note that the children should be terminated in the parent. If each child terminates
-  # itself, it appears that the resource is not released properly, that causes an unexpected
-  # termination of this daemon due to, for example, running out of file descriptors
-  # (see SPARK-21093). Therefore, the current implementation tries to retrieve children
-  # that are exited (but not terminated) and then sends a kill signal to terminate them properly
-  # in the parent.
-  #
-  # There are two paths that it attempts to send a signal to terminate the children in the parent.
-  #
-  #   1. Every second if any socket connection is not available and if there are child workers
-  #     running.
-  #   2. Right after a socket connection is available.
-  #
-  # In other words, the parent attempts to send the signal to the children every second if
-  # any worker is running or right before launching other worker children from the following
-  # new socket connection.
-
-  # Only the process IDs of children sent data to the parent are returned below. The children
-  # send a custom exit code to the parent after being exited and the parent tries
-  # to terminate them only if they sent the exit code.
-  children <- parallel:::selectChildren(timeout = 0)
-
-  if (is.integer(children)) {
-    lapply(children, function(child) {
-      # This data should be raw bytes if any data was sent from this child.
-      # Otherwise, this returns the PID.
-      data <- parallel:::readChild(child)
-      if (is.raw(data)) {
-        # This checks if the data from this child is the exit code that indicates an exited child.
-        if (unserialize(data) == exitCode) {
-          # If so, we terminate this child.
-          tools::pskill(child, tools::SIGUSR1)
-        }
-      }
-    })
-  } else if (is.null(children)) {
-    # If it is NULL, there are no children. Waits indefinitely for a socket connecion.
-    selectTimeout <- NULL
-  }
-
+  ready <- socketSelect(list(inputCon))
   if (ready) {
     port <- SparkR:::readInt(inputCon)
     # There is a small chance that it could be interrupted by signal, retry one time
@@ -91,16 +44,12 @@ while (TRUE) {
     }
     p <- parallel:::mcfork()
     if (inherits(p, "masterProcess")) {
-      # Reach here because this is a child process.
       close(inputCon)
       Sys.setenv(SPARKR_WORKER_PORT = port)
       try(source(script))
-      # Note that this mcexit does not fully terminate this child. So, this writes back
-      # a custom exit code so that the parent can read and terminate this child.
-      parallel:::mcexit(0L, send = exitCode)
-    } else {
-      # Forking succeeded and we need to check if they finished their jobs every second.
-      selectTimeout <- 1
+      # Set SIGUSR1 so that child can exit
+      tools::pskill(Sys.getpid(), tools::SIGUSR1)
+      parallel:::mcexit(0L)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org