You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/05/30 00:07:10 UTC
[beam] branch master updated: Support installing Beam SDK from a
wheel distribution in SDK containers. (#5446)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1abeee9 Support installing Beam SDK from a wheel distribution in SDK containers. (#5446)
1abeee9 is described below
commit 1abeee9ee0da16d3a9ab713d9b0c4198ddf766f0
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Tue May 29 17:06:50 2018 -0700
Support installing Beam SDK from a wheel distribution in SDK containers. (#5446)
Support installing Beam SDK from a wheel file.
---
sdks/python/container/boot.go | 12 ++-
sdks/python/container/piputil.go | 214 ++++++++++++++++++++++-----------------
2 files changed, 130 insertions(+), 96 deletions(-)
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 14e2619..9001868 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -27,8 +27,8 @@ import (
"strings"
"github.com/apache/beam/sdks/go/pkg/beam/artifact"
- pbpipeline "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
pbjob "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+ pbpipeline "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/go/pkg/beam/provision"
"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
@@ -36,6 +36,8 @@ import (
)
var (
+ acceptableWhlSpecs = []string{"cp27-cp27mu-manylinux1_x86_64.whl"}
+
// Contract: https://s.apache.org/beam-fn-api-container-contract.
id = flag.String("id", "", "Local identifier (required).")
@@ -48,10 +50,10 @@ var (
const (
sdkHarnessEntrypoint = "apache_beam.runners.worker.sdk_worker_main"
- // Please keep these names in sync with setup dependency.py
+ // Please keep these names in sync with stager.py
workflowFile = "workflow.tar.gz"
requirementsFile = "requirements.txt"
- sdkFile = "dataflow_python_sdk.tar"
+ sdkSrcFile = "dataflow_python_sdk.tar"
extraPackagesFile = "extra_packages.txt"
)
@@ -100,7 +102,7 @@ func main() {
// TODO(herohde): the packages to install should be specified explicitly. It
// would also be possible to install the SDK in the Dockerfile.
if setupErr := installSetupPackages(files, dir); setupErr != nil {
- log.Fatalf("Failed to install SDK: %v", setupErr)
+ log.Fatalf("Failed to install required packages: %v", setupErr)
}
// (3) Invoke python
@@ -133,7 +135,7 @@ func installSetupPackages(mds []*pbjob.ArtifactMetadata, workDir string) error {
// Install the Dataflow Python SDK and worker packages.
// We install the extra requirements in case of using the beam sdk. These are ignored by pip
// if the user is using an SDK that does not provide these.
- if err := pipInstallPackage(files, workDir, sdkFile, false, false, []string{"gcp"}); err != nil {
+ if err := installSdk(files, workDir, sdkSrcFile, acceptableWhlSpecs); err != nil {
return fmt.Errorf("failed to install SDK: %v", err)
}
// The staged files will not disappear due to restarts because workDir is a
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index b227774..b8d837a 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -16,116 +16,148 @@
package main
import (
- "bufio"
- "bytes"
- "errors"
- "fmt"
- "io/ioutil"
- "log"
- "path/filepath"
- "strings"
+ "bufio"
+ "bytes"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "path/filepath"
+ "strings"
- "github.com/apache/beam/sdks/go/pkg/beam/util/execx"
+ "github.com/apache/beam/sdks/go/pkg/beam/util/execx"
)
const (
- pip = "/usr/local/bin/pip"
+ pip = "/usr/local/bin/pip"
)
// pipInstallRequirements installs the given requirement, if present.
func pipInstallRequirements(files []string, dir, name string) error {
- for _, file := range files {
- if file == name {
- // We run the install process in two rounds in order to avoid as much
- // as possible PyPI downloads. In the first round the --find-links
- // option will make sure that only things staged in the worker will be
- // used without following their dependencies.
- args := []string{"install", "-r", filepath.Join(dir, name), "--no-index", "--no-deps", "--find-links", dir}
- if err := execx.Execute(pip, args...); err != nil {
- return err
- }
- // The second install round opens up the search for packages on PyPI and
- // also installs dependencies. The key is that if all the packages have
- // been installed in the first round then this command will be a no-op.
- args = []string{"install", "-r", filepath.Join(dir, name), "--find-links", dir}
- return execx.Execute(pip, args...)
- }
- }
- return nil
+ for _, file := range files {
+ if file == name {
+ // We run the install process in two rounds in order to avoid as much
+ // as possible PyPI downloads. In the first round the --find-links
+ // option will make sure that only things staged in the worker will be
+ // used without following their dependencies.
+ args := []string{"install", "-r", filepath.Join(dir, name), "--no-index", "--no-deps", "--find-links", dir}
+ if err := execx.Execute(pip, args...); err != nil {
+ return err
+ }
+ // The second install round opens up the search for packages on PyPI and
+ // also installs dependencies. The key is that if all the packages have
+ // been installed in the first round then this command will be a no-op.
+ args = []string{"install", "-r", filepath.Join(dir, name), "--find-links", dir}
+ return execx.Execute(pip, args...)
+ }
+ }
+ return nil
}
// pipInstallPackage installs the given package, if present.
func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error {
- for _, file := range files {
- if file == name {
- var packageSpec = name
- if extras != nil {
- packageSpec += "[" + strings.Join(extras, ",") + "]"
- }
- if force {
- // We only use force reinstallation for packages specified using the
- // --extra_package flag. In this case, we always want to use the
- // user-specified package, overwriting any existing package already
- // installed. At the same time, we want to avoid reinstalling any
- // dependencies. The "pip install" command doesn't have a clean way to do
- // this, so we do this in two steps.
- //
- // First, we use the three flags "--upgrade --force-reinstall --no-deps"
- // to "pip install" so as to force the package to be reinstalled, while
- // avoiding reinstallation of dependencies. Note now that if any needed
- // dependencies were not installed, they will still be missing.
- //
- // Next, we run "pip install" on the package without any flags. Since the
- // installed version will match the package specified, the package itself
- // will not be reinstalled, but its dependencies will now be resolved and
- // installed if necessary. This achieves our goal outlined above.
- args := []string{"install", "--upgrade", "--force-reinstall", "--no-deps",
- filepath.Join(dir, packageSpec)}
- err := execx.Execute(pip, args...)
- if err != nil {
- return err
- }
- args = []string{"install", filepath.Join(dir, packageSpec)}
- return execx.Execute(pip, args...)
- }
+ for _, file := range files {
+ if file == name {
+ var packageSpec = name
+ if extras != nil {
+ packageSpec += "[" + strings.Join(extras, ",") + "]"
+ }
+ if force {
+ // We only use force reinstallation for packages specified using the
+ // --extra_package flag. In this case, we always want to use the
+ // user-specified package, overwriting any existing package already
+ // installed. At the same time, we want to avoid reinstalling any
+ // dependencies. The "pip install" command doesn't have a clean way to do
+ // this, so we do this in two steps.
+ //
+ // First, we use the three flags "--upgrade --force-reinstall --no-deps"
+ // to "pip install" so as to force the package to be reinstalled, while
+ // avoiding reinstallation of dependencies. Note now that if any needed
+ // dependencies were not installed, they will still be missing.
+ //
+ // Next, we run "pip install" on the package without any flags. Since the
+ // installed version will match the package specified, the package itself
+ // will not be reinstalled, but its dependencies will now be resolved and
+ // installed if necessary. This achieves our goal outlined above.
+ args := []string{"install", "--upgrade", "--force-reinstall", "--no-deps",
+ filepath.Join(dir, packageSpec)}
+ err := execx.Execute(pip, args...)
+ if err != nil {
+ return err
+ }
+ args = []string{"install", filepath.Join(dir, packageSpec)}
+ return execx.Execute(pip, args...)
+ }
- // Case when we do not perform a forced reinstall.
- args := []string{"install", filepath.Join(dir, packageSpec)}
- return execx.Execute(pip, args...)
- }
- }
- if optional {
- return nil
- }
- return errors.New("package '" + name + "' not found")
+ // Case when we do not perform a forced reinstall.
+ args := []string{"install", filepath.Join(dir, packageSpec)}
+ return execx.Execute(pip, args...)
+ }
+ }
+ if optional {
+ return nil
+ }
+ return errors.New("package '" + name + "' not found")
}
// installExtraPackages installs all the packages declared in the extra
// packages manifest file.
func installExtraPackages(files []string, extraPackagesFile, dir string) error {
- // First check that extra packages manifest file is present.
- for _, file := range files {
- if file != extraPackagesFile {
- continue
- }
+ // First check that extra packages manifest file is present.
+ for _, file := range files {
+ if file != extraPackagesFile {
+ continue
+ }
- // Found the manifest. Install extra packages.
- manifest, err := ioutil.ReadFile(filepath.Join(dir, extraPackagesFile))
- if err != nil {
- return fmt.Errorf("failed to read extra packages manifest file: %v", err)
- }
+ // Found the manifest. Install extra packages.
+ manifest, err := ioutil.ReadFile(filepath.Join(dir, extraPackagesFile))
+ if err != nil {
+ return fmt.Errorf("failed to read extra packages manifest file: %v", err)
+ }
- s := bufio.NewScanner(bytes.NewReader(manifest))
- s.Split(bufio.ScanLines)
+ s := bufio.NewScanner(bytes.NewReader(manifest))
+ s.Split(bufio.ScanLines)
- for s.Scan() {
- extraPackage := s.Text()
- log.Printf("Installing extra package: %s", extraPackage)
- if err = pipInstallPackage(files, dir, extraPackage, true, false, nil); err != nil {
- return fmt.Errorf("failed to install extra package %s: %v", extraPackage, err)
- }
- }
- return nil
- }
- return nil
+ for s.Scan() {
+ extraPackage := s.Text()
+ log.Printf("Installing extra package: %s", extraPackage)
+ if err = pipInstallPackage(files, dir, extraPackage, true, false, nil); err != nil {
+ return fmt.Errorf("failed to install extra package %s: %v", extraPackage, err)
+ }
+ }
+ return nil
+ }
+ return nil
+}
+
+func findBeamSdkWhl(files []string, acceptableWhlSpecs []string) string {
+ for _, file := range files {
+ if strings.HasPrefix(file, "apache_beam") {
+ for _, s := range acceptableWhlSpecs {
+ if strings.HasSuffix(file, s) {
+ log.Printf("Found Apache Beam SDK wheel: %v", file)
+ return file
+ }
+ }
+ }
+ }
+ return ""
+}
+
+// InstallSdk installs Beam SDK: First, we try to find a compiled
+// wheel distribution of Apache Beam among staged files. If we find it, we
+// assume that the pipleine was started with the Beam SDK found in the wheel
+// file, and we try to install it. If not successful, we fall back to installing
+// SDK from source tarball provided in sdkSrcFile.
+func installSdk(files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string) error {
+ sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)
+ if sdkWhlFile != "" {
+ err := pipInstallPackage(files, workDir, sdkWhlFile, false, false, []string{"gcp"})
+ if err == nil {
+ return nil
+ }
+ log.Printf("Could not install Apache Beam SDK from a wheel: %v, proceeding to install SDK from source tarball.", err)
+ }
+ err := pipInstallPackage(files, workDir, sdkSrcFile, false, false, []string{"gcp"})
+ return err
}
--
To stop receiving notification emails like this one, please contact
altay@apache.org.