You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/07/08 21:24:41 UTC

spark git commit: [SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak

Repository: spark
Updated Branches:
  refs/heads/master c3712b77a -> 08e0d033b


[SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak

## What changes were proposed in this pull request?

This is a retry for #18320. This PR was reverted due to unexpected test failures with -10 error code.

I was unable to reproduce in MacOS, CentOS and Ubuntu but only in Jenkins. So, the tests proceeded to verify this and revert the past try here - https://github.com/apache/spark/pull/18456

This new approach was tested in https://github.com/apache/spark/pull/18463.

**Test results**:

- With the part of suspicious change in the past try (https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189)

  Tests ran 4 times and 2 times passed and 2 time failed.

- Without the part of suspicious change in the past try (https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189)

  Tests ran 5 times and they all passed.

- With this new approach (https://github.com/apache/spark/pull/18463/commits/0a7589c09f53dfc2094497d8d3e59d6407569417)

  Tests ran 5 times and they all passed.

It looks the cause is as below (see https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189):

```diff
+ exitCode <- 1
...
+   data <- parallel:::readChild(child)
+   if (is.raw(data)) {
+     if (unserialize(data) == exitCode) {
      ...
+     }
+   }

...

- parallel:::mcexit(0L)
+ parallel:::mcexit(0L, send = exitCode)
```

Two possibilities I think

 - `parallel:::mcexit(.. , send = exitCode)`

   https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/mcfork.html

   > It sends send to the master (unless NULL) and then shuts down the child process.

   However, it looks possible that the parent attemps to terminate the child right after getting our custom exit code. So, the child gets terminated between "send" and "shuts down", failing to exit properly.

 - A bug between `parallel:::mcexit(..., send = ...)` and `parallel:::readChild`.

**Proposal**:

To resolve this, I simply decided to avoid both possibilities with this new approach here (https://github.com/apache/spark/pull/18465/commits/9ff89a7859cb9f427fc774f33c3521c7d962b723). To support this idea, I explained with some quotation of the documentation as below:

https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/mcfork.html

> `readChild` and `readChildren` return a raw vector with a "pid" attribute if data were available, an integer vector of length one with the process ID if a child terminated or `NULL` if the child no longer exists (no children at all for `readChildren`).

`readChild` returns "an integer vector of length one with the process ID if a child terminated" so we can check if it is `integer` and the same selected "process ID". I believe this makes sure that the children are exited.

In case that children happen to send any data manually to parent (which is why we introduced the suspicious part of the change (https://github.com/apache/spark/pull/18463/commits/466325d3fd353668583f3bde38ae490d9db0b189)), this should be raw bytes and will be discarded (and then will try to read the next and check if it is `integer` in the next loop).

## How was this patch tested?

Manual tests and Jenkins tests.

Author: hyukjinkwon <gu...@gmail.com>

Closes #18465 from HyukjinKwon/SPARK-21093-retry-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08e0d033
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08e0d033
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08e0d033

Branch: refs/heads/master
Commit: 08e0d033b40946b4ef5741a7aa1e7ba0bd48c6fb
Parents: c3712b7
Author: hyukjinkwon <gu...@gmail.com>
Authored: Sat Jul 8 14:24:37 2017 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Sat Jul 8 14:24:37 2017 -0700

----------------------------------------------------------------------
 R/pkg/inst/worker/daemon.R | 51 ++++++++++++++++++++++++++++++++++++++---
 1 file changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/08e0d033/R/pkg/inst/worker/daemon.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R
index 3a318b7..2e31dc5 100644
--- a/R/pkg/inst/worker/daemon.R
+++ b/R/pkg/inst/worker/daemon.R
@@ -30,8 +30,50 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
 inputCon <- socketConnection(
     port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
 
+# Waits indefinitely for a socket connecion by default.
+selectTimeout <- NULL
+
 while (TRUE) {
-  ready <- socketSelect(list(inputCon))
+  ready <- socketSelect(list(inputCon), timeout = selectTimeout)
+
+  # Note that the children should be terminated in the parent. If each child terminates
+  # itself, it appears that the resource is not released properly, that causes an unexpected
+  # termination of this daemon due to, for example, running out of file descriptors
+  # (see SPARK-21093). Therefore, the current implementation tries to retrieve children
+  # that are exited (but not terminated) and then sends a kill signal to terminate them properly
+  # in the parent.
+  #
+  # There are two paths that it attempts to send a signal to terminate the children in the parent.
+  #
+  #   1. Every second if any socket connection is not available and if there are child workers
+  #     running.
+  #   2. Right after a socket connection is available.
+  #
+  # In other words, the parent attempts to send the signal to the children every second if
+  # any worker is running or right before launching other worker children from the following
+  # new socket connection.
+
+  # The process IDs of exited children are returned below.
+  children <- parallel:::selectChildren(timeout = 0)
+
+  if (is.integer(children)) {
+    lapply(children, function(child) {
+      # This should be the PIDs of exited children. Otherwise, this returns raw bytes if any data
+      # was sent from this child. In this case, we discard it.
+      pid <- parallel:::readChild(child)
+      if (is.integer(pid)) {
+        # This checks if the data from this child is the same pid of this selected child.
+        if (child == pid) {
+          # If so, we terminate this child.
+          tools::pskill(child, tools::SIGUSR1)
+        }
+      }
+    })
+  } else if (is.null(children)) {
+    # If it is NULL, there are no children. Waits indefinitely for a socket connecion.
+    selectTimeout <- NULL
+  }
+
   if (ready) {
     port <- SparkR:::readInt(inputCon)
     # There is a small chance that it could be interrupted by signal, retry one time
@@ -44,12 +86,15 @@ while (TRUE) {
     }
     p <- parallel:::mcfork()
     if (inherits(p, "masterProcess")) {
+      # Reach here because this is a child process.
       close(inputCon)
       Sys.setenv(SPARKR_WORKER_PORT = port)
       try(source(script))
-      # Set SIGUSR1 so that child can exit
-      tools::pskill(Sys.getpid(), tools::SIGUSR1)
+      # Note that this mcexit does not fully terminate this child.
       parallel:::mcexit(0L)
+    } else {
+      # Forking succeeded and we need to check if they finished their jobs every second.
+      selectTimeout <- 1
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org