You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ri...@apache.org on 2023/08/23 17:19:57 UTC
[beam] branch master updated: [Python] Get available python version and use it for Python SDK harness boot entry point (#28046)
This is an automated email from the ASF dual-hosted git repository.
riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 395c4d15bb7 [Python] Get available python version and use it for Python SDK harness boot entry point (#28046)
395c4d15bb7 is described below
commit 395c4d15bb74351b0aa020dc7463de8d85766e07
Author: Ritesh Ghorse <ri...@gmail.com>
AuthorDate: Wed Aug 23 13:19:49 2023 -0400
[Python] Get available python version and use it for Python SDK harness boot entry point (#28046)
* add env variable config, use GetPythonVersion
* fix error message
* add spaces
---
.../core/runtime/xlangx/expansionx/download.go | 15 ++++++++--
.../runtime/xlangx/expansionx/download_test.go | 32 ++++++++++++++++++++++
sdks/python/container/boot.go | 13 +++++++--
sdks/python/container/piputil.go | 19 +++++++++----
sdks/python/expansion-service-container/boot.go | 9 ++++--
5 files changed, 76 insertions(+), 12 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
index af3b495720b..e5fff103967 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
@@ -382,20 +382,29 @@ func jarExists(jarPath string) bool {
return err == nil
}
-func getPythonVersion() (string, error) {
+// GetPythonVersion returns the Python version to use. It checks for
+// env variable PYTHON_PATH and returns that it if set.
+// If no PYTHON_PATH is defined then it checks for python or python3
+// and returns that. Otherwise it returns an error.
+func GetPythonVersion() (string, error) {
+ if pythonPath := os.Getenv("PYTHON_PATH"); pythonPath != "" {
+ return pythonPath, nil
+ }
for _, v := range []string{"python", "python3"} {
cmd := exec.Command(v, "--version")
if err := cmd.Run(); err == nil {
return v, nil
}
}
- return "", fmt.Errorf("no python installation found")
+ return "", errors.New("no python installation found. If you use a " +
+ "custom container image, please check if python/python3 is available or specify the " +
+ "full path to the python interpreter in PYTHON_PATH environment variable")
}
// SetUpPythonEnvironment sets up the virtual ennvironment required for the
// Apache Beam Python SDK to run an expansion service module.
func SetUpPythonEnvironment(extraPackage string) (string, error) {
- py, err := getPythonVersion()
+ py, err := GetPythonVersion()
if err != nil {
return "", fmt.Errorf("no python installation found: %v", err)
}
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go
index 9779c236189..65e72342a9b 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go
@@ -217,3 +217,35 @@ func TestGetJar_dev(t *testing.T) {
t.Errorf("error message does not contain gradle command %v for user, got message: %v", gradleTarget, err)
}
}
+
+func TestGetPythonVersion(t *testing.T) {
+ tests := []struct {
+ name string
+ PYTHON_PATH string
+ }{
+ {
+ name: "PYTHON_PATH set",
+ PYTHON_PATH: "/bin/python",
+ },
+ {
+ name: "PYTHON_PATH not set",
+ PYTHON_PATH: "",
+ },
+ }
+
+ for _, test := range tests {
+ if test.PYTHON_PATH != "" {
+ os.Setenv("PYTHON_PATH", test.PYTHON_PATH)
+ }
+ pythonVersion, err := GetPythonVersion()
+ if err != nil {
+ t.Errorf("python installation not found: %v, when PYTHON_PATH=%v", err, test.PYTHON_PATH)
+ }
+ if test.PYTHON_PATH != "" && pythonVersion != test.PYTHON_PATH {
+ t.Errorf("incorrect PYTHON_PATH, want: %v, got: %v", test.PYTHON_PATH, pythonVersion)
+ }
+ if test.PYTHON_PATH != "" {
+ os.Unsetenv(test.PYTHON_PATH)
+ }
+ }
+}
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index c67239f0564..e7b11daa397 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -36,6 +36,7 @@ import (
"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
@@ -91,7 +92,11 @@ func main() {
"--container_executable=/opt/apache/beam/boot",
}
log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " "))
- if err := execx.Execute("python", args...); err != nil {
+ pythonVersion, err := expansionx.GetPythonVersion()
+ if err != nil {
+ log.Fatalf("Python SDK worker pool exited with error: %v", err)
+ }
+ if err := execx.Execute(pythonVersion, args...); err != nil {
log.Fatalf("Python SDK worker pool exited with error: %v", err)
}
log.Print("Python SDK worker pool exited.")
@@ -336,7 +341,11 @@ func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId stri
if err := os.MkdirAll(dir, 0750); err != nil {
return "", fmt.Errorf("failed to create Python venv directory: %s", err)
}
- if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil {
+ pythonVersion, err := expansionx.GetPythonVersion()
+ if err != nil {
+ return "", err
+ }
+ if err := execx.Execute(pythonVersion, "-m", "venv", "--system-site-packages", dir); err != nil {
return "", fmt.Errorf("python venv initialization failed: %s", err)
}
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index 720bf372c53..350bda049d9 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -26,11 +26,16 @@ import (
"path/filepath"
"strings"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
)
// pipInstallRequirements installs the given requirement, if present.
func pipInstallRequirements(files []string, dir, name string) error {
+ pythonVersion, err := expansionx.GetPythonVersion()
+ if err != nil {
+ return err
+ }
for _, file := range files {
if file == name {
// We run the install process in two rounds in order to avoid as much
@@ -38,14 +43,14 @@ func pipInstallRequirements(files []string, dir, name string) error {
// option will make sure that only things staged in the worker will be
// used without following their dependencies.
args := []string{"-m", "pip", "install", "-q", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir}
- if err := execx.Execute("python", args...); err != nil {
+ if err := execx.Execute(pythonVersion, args...); err != nil {
fmt.Println("Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.")
}
// The second install round opens up the search for packages on PyPI and
// also installs dependencies. The key is that if all the packages have
// been installed in the first round then this command will be a no-op.
args = []string{"-m", "pip", "install", "-q", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--find-links", dir}
- return execx.Execute("python", args...)
+ return execx.Execute(pythonVersion, args...)
}
}
return nil
@@ -65,6 +70,10 @@ func isPackageInstalled(pkgName string) bool {
// pipInstallPackage installs the given package, if present.
func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error {
+ pythonVersion, err := expansionx.GetPythonVersion()
+ if err != nil {
+ return err
+ }
for _, file := range files {
if file == name {
var packageSpec = name
@@ -90,17 +99,17 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e
// installed if necessary. This achieves our goal outlined above.
args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
filepath.Join(dir, packageSpec)}
- err := execx.Execute("python", args...)
+ err := execx.Execute(pythonVersion, args...)
if err != nil {
return err
}
args = []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
- return execx.Execute("python", args...)
+ return execx.Execute(pythonVersion, args...)
}
// Case when we do not perform a forced reinstall.
args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
- return execx.Execute("python", args...)
+ return execx.Execute(pythonVersion, args...)
}
}
if optional {
diff --git a/sdks/python/expansion-service-container/boot.go b/sdks/python/expansion-service-container/boot.go
index 171e8ef62a3..90a97c35425 100644
--- a/sdks/python/expansion-service-container/boot.go
+++ b/sdks/python/expansion-service-container/boot.go
@@ -26,6 +26,7 @@ import (
"strconv"
"strings"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
)
@@ -58,6 +59,10 @@ func main() {
}
func launchExpansionServiceProcess() error {
+ pythonVersion, err := expansionx.GetPythonVersion()
+ if err != nil {
+ return err
+ }
log.Printf("Starting Python expansion service ...")
dir := filepath.Join("/opt/apache/beam", venvDirectory)
@@ -65,8 +70,8 @@ func launchExpansionServiceProcess() error {
os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":"))
args := []string{"-m", expansionServiceEntrypoint, "-p", strconv.Itoa(*port), "--fully_qualified_name_glob", "*"}
- if err := execx.Execute("python", args...); err != nil {
- return fmt.Errorf("Could not start the expansion service: %s", err)
+ if err := execx.Execute(pythonVersion, args...); err != nil {
+ return fmt.Errorf("could not start the expansion service: %s", err)
}
return nil