You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ri...@apache.org on 2023/08/23 17:19:57 UTC

[beam] branch master updated: [Python] Get available python version and use it for Python SDK harness boot entry point (#28046)

This is an automated email from the ASF dual-hosted git repository.

riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 395c4d15bb7 [Python] Get available python version and use it for Python SDK harness boot entry point  (#28046)
395c4d15bb7 is described below

commit 395c4d15bb74351b0aa020dc7463de8d85766e07
Author: Ritesh Ghorse <ri...@gmail.com>
AuthorDate: Wed Aug 23 13:19:49 2023 -0400

    [Python] Get available python version and use it for Python SDK harness boot entry point  (#28046)
    
    * add env variable config, use GetPythonVersion
    
    * fix error message
    
    * add spaces
---
 .../core/runtime/xlangx/expansionx/download.go     | 15 ++++++++--
 .../runtime/xlangx/expansionx/download_test.go     | 32 ++++++++++++++++++++++
 sdks/python/container/boot.go                      | 13 +++++++--
 sdks/python/container/piputil.go                   | 19 +++++++++----
 sdks/python/expansion-service-container/boot.go    |  9 ++++--
 5 files changed, 76 insertions(+), 12 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
index af3b495720b..e5fff103967 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
@@ -382,20 +382,29 @@ func jarExists(jarPath string) bool {
 	return err == nil
 }
 
-func getPythonVersion() (string, error) {
+// GetPythonVersion returns the Python version to use. It checks for
+// env variable PYTHON_PATH and returns that it if set.
+// If no PYTHON_PATH is defined then it checks for python or python3
+// and returns that. Otherwise it returns an error.
+func GetPythonVersion() (string, error) {
+	if pythonPath := os.Getenv("PYTHON_PATH"); pythonPath != "" {
+		return pythonPath, nil
+	}
 	for _, v := range []string{"python", "python3"} {
 		cmd := exec.Command(v, "--version")
 		if err := cmd.Run(); err == nil {
 			return v, nil
 		}
 	}
-	return "", fmt.Errorf("no python installation found")
+	return "", errors.New("no python installation found. If you use a " +
+		"custom container image, please check if python/python3 is available or specify the " +
+		"full path to the python interpreter in PYTHON_PATH environment variable")
 }
 
 // SetUpPythonEnvironment sets up the virtual ennvironment required for the
 // Apache Beam Python SDK to run an expansion service module.
 func SetUpPythonEnvironment(extraPackage string) (string, error) {
-	py, err := getPythonVersion()
+	py, err := GetPythonVersion()
 	if err != nil {
 		return "", fmt.Errorf("no python installation found: %v", err)
 	}
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go
index 9779c236189..65e72342a9b 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go
@@ -217,3 +217,35 @@ func TestGetJar_dev(t *testing.T) {
 		t.Errorf("error message does not contain gradle command %v for user, got message: %v", gradleTarget, err)
 	}
 }
+
+func TestGetPythonVersion(t *testing.T) {
+	tests := []struct {
+		name        string
+		PYTHON_PATH string
+	}{
+		{
+			name:        "PYTHON_PATH set",
+			PYTHON_PATH: "/bin/python",
+		},
+		{
+			name:        "PYTHON_PATH not set",
+			PYTHON_PATH: "",
+		},
+	}
+
+	for _, test := range tests {
+		if test.PYTHON_PATH != "" {
+			os.Setenv("PYTHON_PATH", test.PYTHON_PATH)
+		}
+		pythonVersion, err := GetPythonVersion()
+		if err != nil {
+			t.Errorf("python installation not found: %v, when PYTHON_PATH=%v", err, test.PYTHON_PATH)
+		}
+		if test.PYTHON_PATH != "" && pythonVersion != test.PYTHON_PATH {
+			t.Errorf("incorrect PYTHON_PATH, want: %v, got: %v", test.PYTHON_PATH, pythonVersion)
+		}
+		if test.PYTHON_PATH != "" {
+			os.Unsetenv(test.PYTHON_PATH)
+		}
+	}
+}
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index c67239f0564..e7b11daa397 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -36,6 +36,7 @@ import (
 
 	"github.com/apache/beam/sdks/v2/go/container/tools"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
 	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
@@ -91,7 +92,11 @@ func main() {
 			"--container_executable=/opt/apache/beam/boot",
 		}
 		log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " "))
-		if err := execx.Execute("python", args...); err != nil {
+		pythonVersion, err := expansionx.GetPythonVersion()
+		if err != nil {
+			log.Fatalf("Python SDK worker pool exited with error: %v", err)
+		}
+		if err := execx.Execute(pythonVersion, args...); err != nil {
 			log.Fatalf("Python SDK worker pool exited with error: %v", err)
 		}
 		log.Print("Python SDK worker pool exited.")
@@ -336,7 +341,11 @@ func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId stri
 	if err := os.MkdirAll(dir, 0750); err != nil {
 		return "", fmt.Errorf("failed to create Python venv directory: %s", err)
 	}
-	if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil {
+	pythonVersion, err := expansionx.GetPythonVersion()
+	if err != nil {
+		return "", err
+	}
+	if err := execx.Execute(pythonVersion, "-m", "venv", "--system-site-packages", dir); err != nil {
 		return "", fmt.Errorf("python venv initialization failed: %s", err)
 	}
 
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index 720bf372c53..350bda049d9 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -26,11 +26,16 @@ import (
 	"path/filepath"
 	"strings"
 
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
 )
 
 // pipInstallRequirements installs the given requirement, if present.
 func pipInstallRequirements(files []string, dir, name string) error {
+	pythonVersion, err := expansionx.GetPythonVersion()
+	if err != nil {
+		return err
+	}
 	for _, file := range files {
 		if file == name {
 			// We run the install process in two rounds in order to avoid as much
@@ -38,14 +43,14 @@ func pipInstallRequirements(files []string, dir, name string) error {
 			// option will make sure that only things staged in the worker will be
 			// used without following their dependencies.
 			args := []string{"-m", "pip", "install", "-q", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir}
-			if err := execx.Execute("python", args...); err != nil {
+			if err := execx.Execute(pythonVersion, args...); err != nil {
 				fmt.Println("Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.")
 			}
 			// The second install round opens up the search for packages on PyPI and
 			// also installs dependencies. The key is that if all the packages have
 			// been installed in the first round then this command will be a no-op.
 			args = []string{"-m", "pip", "install", "-q", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--find-links", dir}
-			return execx.Execute("python", args...)
+			return execx.Execute(pythonVersion, args...)
 		}
 	}
 	return nil
@@ -65,6 +70,10 @@ func isPackageInstalled(pkgName string) bool {
 
 // pipInstallPackage installs the given package, if present.
 func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error {
+	pythonVersion, err := expansionx.GetPythonVersion()
+	if err != nil {
+		return err
+	}
 	for _, file := range files {
 		if file == name {
 			var packageSpec = name
@@ -90,17 +99,17 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e
 				// installed if necessary.  This achieves our goal outlined above.
 				args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
 					filepath.Join(dir, packageSpec)}
-				err := execx.Execute("python", args...)
+				err := execx.Execute(pythonVersion, args...)
 				if err != nil {
 					return err
 				}
 				args = []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
-				return execx.Execute("python", args...)
+				return execx.Execute(pythonVersion, args...)
 			}
 
 			// Case when we do not perform a forced reinstall.
 			args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
-			return execx.Execute("python", args...)
+			return execx.Execute(pythonVersion, args...)
 		}
 	}
 	if optional {
diff --git a/sdks/python/expansion-service-container/boot.go b/sdks/python/expansion-service-container/boot.go
index 171e8ef62a3..90a97c35425 100644
--- a/sdks/python/expansion-service-container/boot.go
+++ b/sdks/python/expansion-service-container/boot.go
@@ -26,6 +26,7 @@ import (
 	"strconv"
 	"strings"
 
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
 )
 
@@ -58,6 +59,10 @@ func main() {
 }
 
 func launchExpansionServiceProcess() error {
+	pythonVersion, err := expansionx.GetPythonVersion()
+	if err != nil {
+		return err
+	}
 	log.Printf("Starting Python expansion service ...")
 
 	dir := filepath.Join("/opt/apache/beam", venvDirectory)
@@ -65,8 +70,8 @@ func launchExpansionServiceProcess() error {
 	os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":"))
 
 	args := []string{"-m", expansionServiceEntrypoint, "-p", strconv.Itoa(*port), "--fully_qualified_name_glob", "*"}
-	if err := execx.Execute("python", args...); err != nil {
-		return fmt.Errorf("Could not start the expansion service: %s", err)
+	if err := execx.Execute(pythonVersion, args...); err != nil {
+		return fmt.Errorf("could not start the expansion service: %s", err)
 	}
 
 	return nil