You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2022/11/10 17:37:18 UTC

[beam] branch master updated: [BEAM-12792] Install pipline dependencies to temporary venv (#16658)

This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 08b6a524fec [BEAM-12792] Install pipline dependencies to temporary venv (#16658)
08b6a524fec is described below

commit 08b6a524fecc333a12ce42971733ef64ae7d02f1
Author: Janek Bevendorff <ja...@uni-weimar.de>
AuthorDate: Thu Nov 10 18:37:09 2022 +0100

    [BEAM-12792] Install pipline dependencies to temporary venv (#16658)
---
 CHANGES.md                                         |   1 +
 .../apache_beam/runners/worker/worker_pool_main.py |  36 ++--
 sdks/python/container/Dockerfile                   |   2 +-
 sdks/python/container/boot.go                      | 230 +++++++++++++--------
 sdks/python/container/piputil.go                   |  34 +--
 5 files changed, 180 insertions(+), 123 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 824b7982c4d..8f6e1e08c07 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,7 @@
   than requiring them to be passed separately via the `--extra_package` option
   (Python) ([#23684](https://github.com/apache/beam/pull/23684)).
 * Pipeline Resource Hints now supported via `--resource_hints` flag (Go) ([#23990](https://github.com/apache/beam/pull/23990)).
+* Make Python SDK containers reusable on portable runners by installing dependencies to temporary venvs ([BEAM-12792](https://issues.apache.org/jira/browse/BEAM-12792)).
 
 ## Breaking Changes
 
diff --git a/sdks/python/apache_beam/runners/worker/worker_pool_main.py b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
index eb5cdd93516..7e81b1fa6d7 100644
--- a/sdks/python/apache_beam/runners/worker/worker_pool_main.py
+++ b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
@@ -51,6 +51,26 @@ from apache_beam.utils import thread_pool_executor
 _LOGGER = logging.getLogger(__name__)
 
 
+def kill_process_gracefully(proc, timeout=10):
+  """
+  Kill a worker process gracefully by sending a SIGTERM and waiting for
+  it to finish. A SIGKILL will be sent if the process has not finished
+  after ``timeout`` seconds.
+  """
+  def _kill():
+    proc.terminate()
+    try:
+      proc.wait(timeout=timeout)
+    except subprocess.TimeoutExpired:
+      _LOGGER.warning('Worker process did not respond, killing it.')
+      proc.kill()
+      proc.wait()  # Avoid zombies
+
+  kill_thread = threading.Thread(target=_kill)
+  kill_thread.start()
+  kill_thread.join()
+
+
 class BeamFnExternalWorkerPoolServicer(
     beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
 
@@ -95,7 +115,7 @@ class BeamFnExternalWorkerPoolServicer(
     # Register to kill the subprocesses on exit.
     def kill_worker_processes():
       for worker_process in worker_pool._worker_processes.values():
-        worker_process.kill()
+        kill_process_gracefully(worker_process)
 
     atexit.register(kill_worker_processes)
 
@@ -172,19 +192,9 @@ class BeamFnExternalWorkerPoolServicer(
     worker_process = self._worker_processes.pop(
         stop_worker_request.worker_id, None)
     if worker_process:
-
-      def kill_worker_process():
-        try:
-          worker_process.kill()
-        except OSError:
-          # ignore already terminated process
-          return
-
       _LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id)
-      # communicate is necessary to avoid zombie process
-      # time box communicate (it has no timeout parameter in Py2)
-      threading.Timer(1, kill_worker_process).start()
-      worker_process.communicate()
+      kill_process_gracefully(worker_process)
+
     return beam_fn_api_pb2.StopWorkerResponse()
 
 
diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile
index a301db74ee0..fe5238d22ce 100644
--- a/sdks/python/container/Dockerfile
+++ b/sdks/python/container/Dockerfile
@@ -67,7 +67,7 @@ RUN ccache --set-config=sloppiness=file_macro && ccache --set-config=hash_dir=fa
 
 ####
 # Install Apache Beam SDK. Use --no-deps and pip check to verify that all
-# necessary dependencies are specified in base_image_requiremetns.txt.
+# necessary dependencies are specified in base_image_requirements.txt.
 ####
 COPY target/apache-beam.tar.gz /opt/apache/beam/tars/
 RUN pip install --no-deps -v /opt/apache/beam/tars/apache-beam.tar.gz[gcp]
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 8e4cf772c0e..646109127f1 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -26,9 +26,11 @@ import (
 	"log"
 	"os"
 	"os/exec"
+	"os/signal"
 	"path/filepath"
 	"regexp"
 	"strings"
+	"syscall"
 	"sync"
 	"time"
 
@@ -39,7 +41,6 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
 	"github.com/golang/protobuf/jsonpb"
 	"github.com/golang/protobuf/proto"
-	"github.com/nightlyone/lockfile"
 )
 
 var (
@@ -76,10 +77,8 @@ func main() {
 	flag.Parse()
 
 	if *setupOnly {
-		if err := processArtifactsInSetupOnlyMode(); err != nil {
-			log.Fatalf("Setup unsuccessful with error: %v", err)
-		}
-		return
+		processArtifactsInSetupOnlyMode()
+		os.Exit(0)
 	}
 
 	if *workerPool == true {
@@ -92,16 +91,26 @@ func main() {
 			"--container_executable=/opt/apache/beam/boot",
 		}
 		log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " "))
-		log.Fatalf("Python SDK worker pool exited: %v", execx.Execute("python", args...))
+		if err := execx.Execute("python", args...); err != nil {
+			log.Fatalf("Python SDK worker pool exited with error: %v", err)
+		}
+		log.Print("Python SDK worker pool exited.")
+		os.Exit(0)
 	}
 
 	if *id == "" {
-		log.Fatal("No id provided.")
+		log.Fatalf("No id provided.")
 	}
 	if *provisionEndpoint == "" {
-		log.Fatal("No provision endpoint provided.")
+		log.Fatalf("No provision endpoint provided.")
 	}
 
+	if err := launchSDKProcess(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func launchSDKProcess() error {
 	ctx := grpcx.WriteWorkerID(context.Background(), *id)
 
 	info, err := provision.Info(ctx, *provisionEndpoint)
@@ -122,13 +131,13 @@ func main() {
 	}
 
 	if *loggingEndpoint == "" {
-		log.Fatal("No logging endpoint provided.")
+		log.Fatalf("No logging endpoint provided.")
 	}
 	if *artifactEndpoint == "" {
-		log.Fatal("No artifact endpoint provided.")
+		log.Fatalf("No artifact endpoint provided.")
 	}
 	if *controlEndpoint == "" {
-		log.Fatal("No control endpoint provided.")
+		log.Fatalf("No control endpoint provided.")
 	}
 
 	log.Printf("Initializing python harness: %v", strings.Join(os.Args, " "))
@@ -142,41 +151,44 @@ func main() {
 
 	// (2) Retrieve and install the staged packages.
 	//
-	// Guard from concurrent artifact retrieval and installation,
-	// when called by child processes in a worker pool.
+	// No log.Fatalf() from here on, otherwise deferred cleanups will not be called!
 
-	materializeArtifactsFunc := func() {
-		dir := filepath.Join(*semiPersistDir, "staged")
+	// Trap signals, so we can clean up properly.
+	signalChannel := make(chan os.Signal, 1)
+	signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
 
-		files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
-		if err != nil {
-			log.Fatalf("Failed to retrieve staged files: %v", err)
-		}
+	venvDir, err := setupVenv("/opt/apache/beam-venv", *id)
+	if err != nil {
+		return fmt.Errorf("Failed to initialize Python venv.")
+	}
+	cleanupFunc := func() {
+		os.RemoveAll(venvDir)
+		log.Printf("Cleaned up temporary venv for worker %v.", *id)
+	}
+	defer cleanupFunc()
 
-		// TODO(herohde): the packages to install should be specified explicitly. It
-		// would also be possible to install the SDK in the Dockerfile.
-		fileNames := make([]string, len(files))
-		requirementsFiles := []string{requirementsFile}
-		for i, v := range files {
-			name, _ := artifact.MustExtractFilePayload(v)
-			log.Printf("Found artifact: %s", name)
-			fileNames[i] = name
-
-			if v.RoleUrn == artifact.URNPipRequirementsFile {
-				requirementsFiles = append(requirementsFiles, name)
-			}
-		}
+	dir := filepath.Join(*semiPersistDir, "staged")
+	files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
+	if err != nil {
+		return fmt.Errorf("Failed to retrieve staged files: %v", err)
+	}
 
-		if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil {
-			log.Fatalf("Failed to install required packages: %v", setupErr)
+	// TODO(herohde): the packages to install should be specified explicitly. It
+	// would also be possible to install the SDK in the Dockerfile.
+	fileNames := make([]string, len(files))
+	requirementsFiles := []string{requirementsFile}
+	for i, v := range files {
+		name, _ := artifact.MustExtractFilePayload(v)
+		log.Printf("Found artifact: %s", name)
+		fileNames[i] = name
+
+		if v.RoleUrn == artifact.URNPipRequirementsFile {
+			requirementsFiles = append(requirementsFiles, name)
 		}
 	}
 
-	workerPoolId := os.Getenv(workerPoolIdEnv)
-	if workerPoolId != "" {
-		multiProcessExactlyOnce(materializeArtifactsFunc, "beam.install.complete."+workerPoolId)
-	} else {
-		materializeArtifactsFunc()
+	if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil {
+		return fmt.Errorf("Failed to install required packages: %v", setupErr)
 	}
 
 	// (3) Invoke python
@@ -200,24 +212,112 @@ func main() {
 		}
 	}
 
+	workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
+
+	// Keep track of child PIDs for clean shutdown without zombies
+	childPids := struct {
+		v []int
+		canceled bool
+		mu sync.Mutex
+	} {v: make([]int, 0, len(workerIds))}
+
+	// Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies
+	go func() {
+		log.Printf("Received signal: %v", <-signalChannel)
+		childPids.mu.Lock()
+		childPids.canceled = true
+		for _, pid := range childPids.v {
+			go func(pid int) {
+				// This goroutine will be canceled if the main process exits before the 5 seconds
+				// have elapsed, i.e., as soon as all subprocesses have returned from Wait().
+				time.Sleep(5 * time.Second)
+				if err := syscall.Kill(-pid, syscall.SIGKILL); err == nil {
+					log.Printf("Worker process %v did not respond, killed it.", pid)
+				}
+			}(pid)
+			syscall.Kill(-pid, syscall.SIGTERM)
+		}
+		childPids.mu.Unlock()
+	}()
+
 	args := []string{
 		"-m",
 		sdkHarnessEntrypoint,
 	}
 
-	workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
 	var wg sync.WaitGroup
 	wg.Add(len(workerIds))
 	for _, workerId := range workerIds {
 		go func(workerId string) {
-			log.Printf("Executing: python %v", strings.Join(args, " "))
-			log.Fatalf("Python exited: %v", execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...))
+			defer wg.Done()
+
+			childPids.mu.Lock()
+			if childPids.canceled {
+				childPids.mu.Unlock()
+				return
+			}
+			log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
+			cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)
+			childPids.v = append(childPids.v, cmd.Process.Pid)
+			childPids.mu.Unlock()
+
+			if err := cmd.Wait(); err != nil {
+				log.Printf("Python (worker %v) exited: %v", workerId, err)
+			} else {
+				log.Printf("Python (worker %v) exited.", workerId)
+			}
 		}(workerId)
 	}
 	wg.Wait()
+	return nil
+}
+
+// Start a command object in a new process group with the given arguments with
+// additional environment variables. It attaches stdio to the child process.
+// Returns the process handle.
+func StartCommandEnv(env map[string]string, prog string, args ...string) *exec.Cmd {
+	cmd := exec.Command(prog, args...)
+	cmd.Stdin = os.Stdin
+	cmd.Stdout = os.Stdout
+	cmd.Stderr = os.Stderr
+	if env != nil {
+		cmd.Env = os.Environ()
+		for k, v := range env {
+			cmd.Env = append(cmd.Env, k+"="+v)
+		}
+	}
+
+	// Create process group so we can clean up the whole subtree later without creating zombies
+	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
+	cmd.Start()
+	return cmd
 }
 
-// setup wheel specs according to installed python version
+// setupVenv initializes a local Python venv and sets the corresponding env variables
+func setupVenv(baseDir, workerId string) (string, error) {
+	log.Printf("Initializing temporary Python venv ...")
+
+	dir := filepath.Join(baseDir, "beam-venv-worker-" + workerId)
+	if _, err := os.Stat(dir); !os.IsNotExist(err) {
+		// Probably leftovers from a previous run
+		log.Printf("Cleaning up previous venv ...")
+		if err := os.RemoveAll(dir); err != nil {
+			return "", err
+		}
+	}
+	if err := os.MkdirAll(dir, 0750); err != nil {
+		return "", fmt.Errorf("Failed to create Python venv directory: %s", err)
+	}
+	if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil {
+		return "", fmt.Errorf("Python venv initialization failed: %s", err)
+	}
+
+	os.Setenv("VIRTUAL_ENV", dir)
+	os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":"))
+	return dir, nil
+}
+
+// setupAcceptableWheelSpecs setup wheel specs according to installed python version
 func setupAcceptableWheelSpecs() error {
 	cmd := exec.Command("python", "-V")
 	stdoutStderr, err := cmd.CombinedOutput()
@@ -282,53 +382,12 @@ func joinPaths(dir string, paths ...string) []string {
 	return ret
 }
 
-// Call the given function exactly once across multiple worker processes.
-// The need for multiple processes is specific to the Python SDK due to the GIL.
-// Should another SDK require it, this could be separated out as shared utility.
-func multiProcessExactlyOnce(actionFunc func(), completeFileName string) {
-	installCompleteFile := filepath.Join(os.TempDir(), completeFileName)
-
-	// skip if install already complete, no need to lock
-	_, err := os.Stat(installCompleteFile)
-	if err == nil {
-		return
-	}
-
-	lock, err := lockfile.New(filepath.Join(os.TempDir(), completeFileName+".lck"))
-	if err != nil {
-		log.Fatalf("Cannot init artifact retrieval lock: %v", err)
-	}
-
-	for err = lock.TryLock(); err != nil; err = lock.TryLock() {
-		if _, ok := err.(lockfile.TemporaryError); ok {
-			time.Sleep(5 * time.Second)
-			log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock)
-		} else {
-			log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err)
-		}
-	}
-	defer lock.Unlock()
-
-	// skip if install already complete
-	_, err = os.Stat(installCompleteFile)
-	if err == nil {
-		return
-	}
-
-	// do the real work
-	actionFunc()
-
-	// mark install complete
-	os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666)
-
-}
-
 // processArtifactsInSetupOnlyMode installs the dependencies found in artifacts
 // when flag --setup_only and --artifacts exist. The setup mode will only
 // process the provided artifacts and skip the actual worker program start up.
 // The mode is useful for building new images with dependencies pre-installed so
 // that the installation can be skipped at the pipeline runtime.
-func processArtifactsInSetupOnlyMode() error {
+func processArtifactsInSetupOnlyMode() {
 	if *artifacts == "" {
 		log.Fatal("No --artifacts provided along with --setup_only flag.")
 	}
@@ -362,5 +421,4 @@ func processArtifactsInSetupOnlyMode() error {
 	if setupErr := installSetupPackages(files, workDir, []string{requirementsFile}); setupErr != nil {
 		log.Fatalf("Failed to install required packages: %v", setupErr)
 	}
-	return nil
 }
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index 60cf33549cf..a931f45a672 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -29,18 +29,6 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
 )
 
-var (
-	pip = pipLocation()
-)
-
-func pipLocation() string {
-	// Users can set 'pip' environment variable to use a custom pip path.
-	if v, ok := os.LookupEnv("pip"); ok {
-		return v
-	}
-	return "pip"
-}
-
 // pipInstallRequirements installs the given requirement, if present.
 func pipInstallRequirements(files []string, dir, name string) error {
 	for _, file := range files {
@@ -49,15 +37,15 @@ func pipInstallRequirements(files []string, dir, name string) error {
 			// as possible PyPI downloads. In the first round the --find-links
 			// option will make sure that only things staged in the worker will be
 			// used without following their dependencies.
-			args := []string{"install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir}
-			if err := execx.Execute(pip, args...); err != nil {
-		        fmt.Println("Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.")
+			args := []string{"-m", "pip", "install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir}
+			if err := execx.Execute("python", args...); err != nil {
+				fmt.Println("Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.")
 			}
 			// The second install round opens up the search for packages on PyPI and
 			// also installs dependencies. The key is that if all the packages have
 			// been installed in the first round then this command will be a no-op.
-			args = []string{"install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--find-links", dir}
-			return execx.Execute(pip, args...)
+			args = []string{"-m", "pip", "install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--find-links", dir}
+			return execx.Execute("python", args...)
 		}
 	}
 	return nil
@@ -88,19 +76,19 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e
 				// installed version will match the package specified, the package itself
 				// will not be reinstalled, but its dependencies will now be resolved and
 				// installed if necessary.  This achieves our goal outlined above.
-				args := []string{"install", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
+				args := []string{"-m", "pip", "install", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
 					filepath.Join(dir, packageSpec)}
-				err := execx.Execute(pip, args...)
+				err := execx.Execute("python", args...)
 				if err != nil {
 					return err
 				}
-				args = []string{"install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
-				return execx.Execute(pip, args...)
+				args = []string{"-m", "pip", "install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
+				return execx.Execute("python", args...)
 			}
 
 			// Case when we do not perform a forced reinstall.
-			args := []string{"install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
-			return execx.Execute(pip, args...)
+			args := []string{"-m", "pip", "install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
+			return execx.Execute("python", args...)
 		}
 	}
 	if optional {