You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2022/11/10 17:37:18 UTC
[beam] branch master updated: [BEAM-12792] Install pipline dependencies to temporary venv (#16658)
This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 08b6a524fec [BEAM-12792] Install pipline dependencies to temporary venv (#16658)
08b6a524fec is described below
commit 08b6a524fecc333a12ce42971733ef64ae7d02f1
Author: Janek Bevendorff <ja...@uni-weimar.de>
AuthorDate: Thu Nov 10 18:37:09 2022 +0100
[BEAM-12792] Install pipline dependencies to temporary venv (#16658)
---
CHANGES.md | 1 +
.../apache_beam/runners/worker/worker_pool_main.py | 36 ++--
sdks/python/container/Dockerfile | 2 +-
sdks/python/container/boot.go | 230 +++++++++++++--------
sdks/python/container/piputil.go | 34 +--
5 files changed, 180 insertions(+), 123 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 824b7982c4d..8f6e1e08c07 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,7 @@
than requiring them to be passed separately via the `--extra_package` option
(Python) ([#23684](https://github.com/apache/beam/pull/23684)).
* Pipeline Resource Hints now supported via `--resource_hints` flag (Go) ([#23990](https://github.com/apache/beam/pull/23990)).
+* Make Python SDK containers reusable on portable runners by installing dependencies to temporary venvs ([BEAM-12792](https://issues.apache.org/jira/browse/BEAM-12792)).
## Breaking Changes
diff --git a/sdks/python/apache_beam/runners/worker/worker_pool_main.py b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
index eb5cdd93516..7e81b1fa6d7 100644
--- a/sdks/python/apache_beam/runners/worker/worker_pool_main.py
+++ b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
@@ -51,6 +51,26 @@ from apache_beam.utils import thread_pool_executor
_LOGGER = logging.getLogger(__name__)
+def kill_process_gracefully(proc, timeout=10):
+ """
+ Kill a worker process gracefully by sending a SIGTERM and waiting for
+ it to finish. A SIGKILL will be sent if the process has not finished
+ after ``timeout`` seconds.
+ """
+ def _kill():
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ _LOGGER.warning('Worker process did not respond, killing it.')
+ proc.kill()
+ proc.wait() # Avoid zombies
+
+ kill_thread = threading.Thread(target=_kill)
+ kill_thread.start()
+ kill_thread.join()
+
+
class BeamFnExternalWorkerPoolServicer(
beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
@@ -95,7 +115,7 @@ class BeamFnExternalWorkerPoolServicer(
# Register to kill the subprocesses on exit.
def kill_worker_processes():
for worker_process in worker_pool._worker_processes.values():
- worker_process.kill()
+ kill_process_gracefully(worker_process)
atexit.register(kill_worker_processes)
@@ -172,19 +192,9 @@ class BeamFnExternalWorkerPoolServicer(
worker_process = self._worker_processes.pop(
stop_worker_request.worker_id, None)
if worker_process:
-
- def kill_worker_process():
- try:
- worker_process.kill()
- except OSError:
- # ignore already terminated process
- return
-
_LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id)
- # communicate is necessary to avoid zombie process
- # time box communicate (it has no timeout parameter in Py2)
- threading.Timer(1, kill_worker_process).start()
- worker_process.communicate()
+ kill_process_gracefully(worker_process)
+
return beam_fn_api_pb2.StopWorkerResponse()
diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile
index a301db74ee0..fe5238d22ce 100644
--- a/sdks/python/container/Dockerfile
+++ b/sdks/python/container/Dockerfile
@@ -67,7 +67,7 @@ RUN ccache --set-config=sloppiness=file_macro && ccache --set-config=hash_dir=fa
####
# Install Apache Beam SDK. Use --no-deps and pip check to verify that all
-# necessary dependencies are specified in base_image_requiremetns.txt.
+# necessary dependencies are specified in base_image_requirements.txt.
####
COPY target/apache-beam.tar.gz /opt/apache/beam/tars/
RUN pip install --no-deps -v /opt/apache/beam/tars/apache-beam.tar.gz[gcp]
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 8e4cf772c0e..646109127f1 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -26,9 +26,11 @@ import (
"log"
"os"
"os/exec"
+ "os/signal"
"path/filepath"
"regexp"
"strings"
+ "syscall"
"sync"
"time"
@@ -39,7 +41,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
- "github.com/nightlyone/lockfile"
)
var (
@@ -76,10 +77,8 @@ func main() {
flag.Parse()
if *setupOnly {
- if err := processArtifactsInSetupOnlyMode(); err != nil {
- log.Fatalf("Setup unsuccessful with error: %v", err)
- }
- return
+ processArtifactsInSetupOnlyMode()
+ os.Exit(0)
}
if *workerPool == true {
@@ -92,16 +91,26 @@ func main() {
"--container_executable=/opt/apache/beam/boot",
}
log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " "))
- log.Fatalf("Python SDK worker pool exited: %v", execx.Execute("python", args...))
+ if err := execx.Execute("python", args...); err != nil {
+ log.Fatalf("Python SDK worker pool exited with error: %v", err)
+ }
+ log.Print("Python SDK worker pool exited.")
+ os.Exit(0)
}
if *id == "" {
- log.Fatal("No id provided.")
+ log.Fatalf("No id provided.")
}
if *provisionEndpoint == "" {
- log.Fatal("No provision endpoint provided.")
+ log.Fatalf("No provision endpoint provided.")
}
+ if err := launchSDKProcess(); err != nil {
+ log.Fatal(err)
+ }
+}
+
+func launchSDKProcess() error {
ctx := grpcx.WriteWorkerID(context.Background(), *id)
info, err := provision.Info(ctx, *provisionEndpoint)
@@ -122,13 +131,13 @@ func main() {
}
if *loggingEndpoint == "" {
- log.Fatal("No logging endpoint provided.")
+ log.Fatalf("No logging endpoint provided.")
}
if *artifactEndpoint == "" {
- log.Fatal("No artifact endpoint provided.")
+ log.Fatalf("No artifact endpoint provided.")
}
if *controlEndpoint == "" {
- log.Fatal("No control endpoint provided.")
+ log.Fatalf("No control endpoint provided.")
}
log.Printf("Initializing python harness: %v", strings.Join(os.Args, " "))
@@ -142,41 +151,44 @@ func main() {
// (2) Retrieve and install the staged packages.
//
- // Guard from concurrent artifact retrieval and installation,
- // when called by child processes in a worker pool.
+ // No log.Fatalf() from here on, otherwise deferred cleanups will not be called!
- materializeArtifactsFunc := func() {
- dir := filepath.Join(*semiPersistDir, "staged")
+ // Trap signals, so we can clean up properly.
+ signalChannel := make(chan os.Signal, 1)
+ signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
- files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
- if err != nil {
- log.Fatalf("Failed to retrieve staged files: %v", err)
- }
+ venvDir, err := setupVenv("/opt/apache/beam-venv", *id)
+ if err != nil {
+ return fmt.Errorf("Failed to initialize Python venv.")
+ }
+ cleanupFunc := func() {
+ os.RemoveAll(venvDir)
+ log.Printf("Cleaned up temporary venv for worker %v.", *id)
+ }
+ defer cleanupFunc()
- // TODO(herohde): the packages to install should be specified explicitly. It
- // would also be possible to install the SDK in the Dockerfile.
- fileNames := make([]string, len(files))
- requirementsFiles := []string{requirementsFile}
- for i, v := range files {
- name, _ := artifact.MustExtractFilePayload(v)
- log.Printf("Found artifact: %s", name)
- fileNames[i] = name
-
- if v.RoleUrn == artifact.URNPipRequirementsFile {
- requirementsFiles = append(requirementsFiles, name)
- }
- }
+ dir := filepath.Join(*semiPersistDir, "staged")
+ files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
+ if err != nil {
+ return fmt.Errorf("Failed to retrieve staged files: %v", err)
+ }
- if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil {
- log.Fatalf("Failed to install required packages: %v", setupErr)
+ // TODO(herohde): the packages to install should be specified explicitly. It
+ // would also be possible to install the SDK in the Dockerfile.
+ fileNames := make([]string, len(files))
+ requirementsFiles := []string{requirementsFile}
+ for i, v := range files {
+ name, _ := artifact.MustExtractFilePayload(v)
+ log.Printf("Found artifact: %s", name)
+ fileNames[i] = name
+
+ if v.RoleUrn == artifact.URNPipRequirementsFile {
+ requirementsFiles = append(requirementsFiles, name)
}
}
- workerPoolId := os.Getenv(workerPoolIdEnv)
- if workerPoolId != "" {
- multiProcessExactlyOnce(materializeArtifactsFunc, "beam.install.complete."+workerPoolId)
- } else {
- materializeArtifactsFunc()
+ if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil {
+ return fmt.Errorf("Failed to install required packages: %v", setupErr)
}
// (3) Invoke python
@@ -200,24 +212,112 @@ func main() {
}
}
+ workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
+
+ // Keep track of child PIDs for clean shutdown without zombies
+ childPids := struct {
+ v []int
+ canceled bool
+ mu sync.Mutex
+ } {v: make([]int, 0, len(workerIds))}
+
+ // Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies
+ go func() {
+ log.Printf("Received signal: %v", <-signalChannel)
+ childPids.mu.Lock()
+ childPids.canceled = true
+ for _, pid := range childPids.v {
+ go func(pid int) {
+ // This goroutine will be canceled if the main process exits before the 5 seconds
+ // have elapsed, i.e., as soon as all subprocesses have returned from Wait().
+ time.Sleep(5 * time.Second)
+ if err := syscall.Kill(-pid, syscall.SIGKILL); err == nil {
+ log.Printf("Worker process %v did not respond, killed it.", pid)
+ }
+ }(pid)
+ syscall.Kill(-pid, syscall.SIGTERM)
+ }
+ childPids.mu.Unlock()
+ }()
+
args := []string{
"-m",
sdkHarnessEntrypoint,
}
- workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
var wg sync.WaitGroup
wg.Add(len(workerIds))
for _, workerId := range workerIds {
go func(workerId string) {
- log.Printf("Executing: python %v", strings.Join(args, " "))
- log.Fatalf("Python exited: %v", execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...))
+ defer wg.Done()
+
+ childPids.mu.Lock()
+ if childPids.canceled {
+ childPids.mu.Unlock()
+ return
+ }
+ log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
+ cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)
+ childPids.v = append(childPids.v, cmd.Process.Pid)
+ childPids.mu.Unlock()
+
+ if err := cmd.Wait(); err != nil {
+ log.Printf("Python (worker %v) exited: %v", workerId, err)
+ } else {
+ log.Printf("Python (worker %v) exited.", workerId)
+ }
}(workerId)
}
wg.Wait()
+ return nil
+}
+
+// Start a command object in a new process group with the given arguments with
+// additional environment variables. It attaches stdio to the child process.
+// Returns the process handle.
+func StartCommandEnv(env map[string]string, prog string, args ...string) *exec.Cmd {
+ cmd := exec.Command(prog, args...)
+ cmd.Stdin = os.Stdin
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ if env != nil {
+ cmd.Env = os.Environ()
+ for k, v := range env {
+ cmd.Env = append(cmd.Env, k+"="+v)
+ }
+ }
+
+ // Create process group so we can clean up the whole subtree later without creating zombies
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
+ cmd.Start()
+ return cmd
}
-// setup wheel specs according to installed python version
+// setupVenv initializes a local Python venv and sets the corresponding env variables
+func setupVenv(baseDir, workerId string) (string, error) {
+ log.Printf("Initializing temporary Python venv ...")
+
+ dir := filepath.Join(baseDir, "beam-venv-worker-" + workerId)
+ if _, err := os.Stat(dir); !os.IsNotExist(err) {
+ // Probably leftovers from a previous run
+ log.Printf("Cleaning up previous venv ...")
+ if err := os.RemoveAll(dir); err != nil {
+ return "", err
+ }
+ }
+ if err := os.MkdirAll(dir, 0750); err != nil {
+ return "", fmt.Errorf("Failed to create Python venv directory: %s", err)
+ }
+ if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil {
+ return "", fmt.Errorf("Python venv initialization failed: %s", err)
+ }
+
+ os.Setenv("VIRTUAL_ENV", dir)
+ os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":"))
+ return dir, nil
+}
+
+// setupAcceptableWheelSpecs setup wheel specs according to installed python version
func setupAcceptableWheelSpecs() error {
cmd := exec.Command("python", "-V")
stdoutStderr, err := cmd.CombinedOutput()
@@ -282,53 +382,12 @@ func joinPaths(dir string, paths ...string) []string {
return ret
}
-// Call the given function exactly once across multiple worker processes.
-// The need for multiple processes is specific to the Python SDK due to the GIL.
-// Should another SDK require it, this could be separated out as shared utility.
-func multiProcessExactlyOnce(actionFunc func(), completeFileName string) {
- installCompleteFile := filepath.Join(os.TempDir(), completeFileName)
-
- // skip if install already complete, no need to lock
- _, err := os.Stat(installCompleteFile)
- if err == nil {
- return
- }
-
- lock, err := lockfile.New(filepath.Join(os.TempDir(), completeFileName+".lck"))
- if err != nil {
- log.Fatalf("Cannot init artifact retrieval lock: %v", err)
- }
-
- for err = lock.TryLock(); err != nil; err = lock.TryLock() {
- if _, ok := err.(lockfile.TemporaryError); ok {
- time.Sleep(5 * time.Second)
- log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock)
- } else {
- log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err)
- }
- }
- defer lock.Unlock()
-
- // skip if install already complete
- _, err = os.Stat(installCompleteFile)
- if err == nil {
- return
- }
-
- // do the real work
- actionFunc()
-
- // mark install complete
- os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666)
-
-}
-
// processArtifactsInSetupOnlyMode installs the dependencies found in artifacts
// when flag --setup_only and --artifacts exist. The setup mode will only
// process the provided artifacts and skip the actual worker program start up.
// The mode is useful for building new images with dependencies pre-installed so
// that the installation can be skipped at the pipeline runtime.
-func processArtifactsInSetupOnlyMode() error {
+func processArtifactsInSetupOnlyMode() {
if *artifacts == "" {
log.Fatal("No --artifacts provided along with --setup_only flag.")
}
@@ -362,5 +421,4 @@ func processArtifactsInSetupOnlyMode() error {
if setupErr := installSetupPackages(files, workDir, []string{requirementsFile}); setupErr != nil {
log.Fatalf("Failed to install required packages: %v", setupErr)
}
- return nil
}
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index 60cf33549cf..a931f45a672 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -29,18 +29,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
)
-var (
- pip = pipLocation()
-)
-
-func pipLocation() string {
- // Users can set 'pip' environment variable to use a custom pip path.
- if v, ok := os.LookupEnv("pip"); ok {
- return v
- }
- return "pip"
-}
-
// pipInstallRequirements installs the given requirement, if present.
func pipInstallRequirements(files []string, dir, name string) error {
for _, file := range files {
@@ -49,15 +37,15 @@ func pipInstallRequirements(files []string, dir, name string) error {
// as possible PyPI downloads. In the first round the --find-links
// option will make sure that only things staged in the worker will be
// used without following their dependencies.
- args := []string{"install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir}
- if err := execx.Execute(pip, args...); err != nil {
- fmt.Println("Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.")
+ args := []string{"-m", "pip", "install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir}
+ if err := execx.Execute("python", args...); err != nil {
+ fmt.Println("Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.")
}
// The second install round opens up the search for packages on PyPI and
// also installs dependencies. The key is that if all the packages have
// been installed in the first round then this command will be a no-op.
- args = []string{"install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--find-links", dir}
- return execx.Execute(pip, args...)
+ args = []string{"-m", "pip", "install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--find-links", dir}
+ return execx.Execute("python", args...)
}
}
return nil
@@ -88,19 +76,19 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e
// installed version will match the package specified, the package itself
// will not be reinstalled, but its dependencies will now be resolved and
// installed if necessary. This achieves our goal outlined above.
- args := []string{"install", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
+ args := []string{"-m", "pip", "install", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
filepath.Join(dir, packageSpec)}
- err := execx.Execute(pip, args...)
+ err := execx.Execute("python", args...)
if err != nil {
return err
}
- args = []string{"install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
- return execx.Execute(pip, args...)
+ args = []string{"-m", "pip", "install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
+ return execx.Execute("python", args...)
}
// Case when we do not perform a forced reinstall.
- args := []string{"install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
- return execx.Execute(pip, args...)
+ args := []string{"-m", "pip", "install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
+ return execx.Execute("python", args...)
}
}
if optional {