You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/05 02:15:54 UTC

[GitHub] [beam] tvalentyn commented on a change in pull request #16658: [BEAM-12792] Install pipline dependencies to temporary venv

tvalentyn commented on a change in pull request #16658:
URL: https://github.com/apache/beam/pull/16658#discussion_r820015549



##########
File path: sdks/python/container/boot.go
##########
@@ -210,15 +224,37 @@ func main() {
 	wg.Add(len(workerIds))
 	for _, workerId := range workerIds {
 		go func(workerId string) {
-			log.Printf("Executing: python %v", strings.Join(args, " "))
-			log.Fatalf("Python exited: %v", execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...))
-			wg.Done()
+			defer wg.Done()
+			log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
+			log.Printf("Python (worker %v) exited with code: %v", workerId, execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...))
 		}(workerId)
 	}
 	wg.Wait()
+
+	return nil
+}
+
+// setupVenv initialize a local Python venv and set the corresponding env variables
+func setupVenv(baseDir, workerId string) (string, error) {
+	log.Printf("Initializing temporary Python venv ...")
+
+	if err := os.MkdirAll(baseDir, 0750); err != nil {
+	    return "", fmt.Errorf("Failed to create venv base directory: %s", err)
+	}
+	dir, err := ioutil.TempDir(baseDir, fmt.Sprintf("beam-venv-%s-", workerId))
+	if err != nil {
+	    return "", fmt.Errorf("Failed Python venv directory: %s", err)
+	}
+	args := []string{"-m", "venv", "--system-site-packages", dir}
+	if err := execx.Execute("python", args...); err != nil {
+	    return "", err
+	}
+	os.Setenv("VIRTUAL_ENV", dir)

Review comment:
       Why are we setting these environment vars manually as opposed to  sourcing the activation script?

##########
File path: CHANGES.md
##########
@@ -130,6 +130,7 @@
 * Added support for cloudpickle as a pickling library for Python SDK ([BEAM-8123](https://issues.apache.org/jira/browse/BEAM-8123)). To use cloudpickle, set pipeline option: --pickler_lib=cloudpickle
 * Added option to specify triggering frequency when streaming to BigQuery (Python) ([BEAM-12865](https://issues.apache.org/jira/browse/BEAM-12865)).
 * Added option to enable caching uploaded artifacts across job runs for Python Dataflow jobs ([BEAM-13459](https://issues.apache.org/jira/browse/BEAM-13459)).  To enable, set pipeline option: --enable_artifact_caching, this will be enabled by default in a future release.
+* Make Python SDK containers reusable by installing dependencies to temporary venvs ([BEAM-12792](https://issues.apache.org/jira/browse/BEAM-12792)).

Review comment:
       ...reusable on Portable runners...

##########
File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
##########
@@ -94,8 +114,9 @@ def start(
 
     # Register to kill the subprocesses on exit.
     def kill_worker_processes():
+      _LOGGER.error("CLEANING UP WORKERS!!!!")

Review comment:
       is this a leftover?

##########
File path: sdks/python/container/boot.go
##########
@@ -214,41 +211,105 @@ func mainError() error {
 		}
 	}
 
+	workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
+
+	// Keep track of child PIDs for clean shutdown without zombies
+	childPids := struct {
+		v []int
+		canceled bool
+		mu sync.Mutex
+	} {v: make([]int, 0, len(workerIds))}
+
+	// Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies
+	go func() {
+		log.Printf("Received signal: %v", <-signalChannel)
+		childPids.mu.Lock()
+		childPids.canceled = true
+		for _, pid := range childPids.v {
+			syscall.Kill(-pid, syscall.SIGTERM)
+			go func() {
+				// This goroutine will be canceled if the main process exits before the 5 seconds
+				// have elapsed, i.e., as soon as all subprocesses have returned from Wait().
+				time.Sleep(5 * time.Second)
+				log.Printf("Worker process did not respond, killing it.")
+				syscall.Kill(-pid, syscall.SIGKILL)
+			}()
+		}
+		childPids.mu.Unlock()
+	}()
+
 	args := []string{
 		"-m",
 		sdkHarnessEntrypoint,
 	}
 
-	workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
 	var wg sync.WaitGroup
 	wg.Add(len(workerIds))
 	for _, workerId := range workerIds {
 		go func(workerId string) {
 			defer wg.Done()
+
+			childPids.mu.Lock()
+			if childPids.canceled {
+				childPids.mu.Unlock()
+				return
+			}
 			log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
-			log.Printf("Python (worker %v) exited with code: %v", workerId, execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...))
+			cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)
+			childPids.v = append(childPids.v, cmd.Process.Pid)
+			childPids.mu.Unlock()
+
+			if err := cmd.Wait(); err != nil {
+				log.Printf("Python (worker %v) exited: %v", workerId, err)

Review comment:
       Given that this is an err branch, should this say: `Python worker failed`, or `Python worker exited with error:...`

##########
File path: sdks/python/container/boot.go
##########
@@ -210,15 +224,37 @@ func main() {
 	wg.Add(len(workerIds))
 	for _, workerId := range workerIds {
 		go func(workerId string) {
-			log.Printf("Executing: python %v", strings.Join(args, " "))
-			log.Fatalf("Python exited: %v", execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...))
-			wg.Done()
+			defer wg.Done()
+			log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
+			log.Printf("Python (worker %v) exited with code: %v", workerId, execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...))
 		}(workerId)
 	}
 	wg.Wait()
+
+	return nil
+}
+
+// setupVenv initialize a local Python venv and set the corresponding env variables
+func setupVenv(baseDir, workerId string) (string, error) {
+	log.Printf("Initializing temporary Python venv ...")
+
+	if err := os.MkdirAll(baseDir, 0750); err != nil {
+	    return "", fmt.Errorf("Failed to create venv base directory: %s", err)
+	}
+	dir, err := ioutil.TempDir(baseDir, fmt.Sprintf("beam-venv-%s-", workerId))
+	if err != nil {
+	    return "", fmt.Errorf("Failed Python venv directory: %s", err)

Review comment:
       nit: Failed to create ... ?

##########
File path: sdks/python/container/boot.go
##########
@@ -200,25 +211,111 @@ func main() {
 		}
 	}
 
+	workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)

Review comment:
       @robertwb @chamikaramj do we have any tests that exercise the SiblingWorker  processes functionality? IIRC, this is new. @phoerious - is it correct that your current tests of Flink didn't test this?

##########
File path: sdks/python/container/boot.go
##########
@@ -210,15 +224,37 @@ func main() {
 	wg.Add(len(workerIds))
 	for _, workerId := range workerIds {
 		go func(workerId string) {
-			log.Printf("Executing: python %v", strings.Join(args, " "))
-			log.Fatalf("Python exited: %v", execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...))
-			wg.Done()
+			defer wg.Done()
+			log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
+			log.Printf("Python (worker %v) exited with code: %v", workerId, execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...))
 		}(workerId)
 	}
 	wg.Wait()
+
+	return nil
+}
+
+// setupVenv initialize a local Python venv and set the corresponding env variables

Review comment:
       nit: initializes and sets




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org