You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/05/30 00:07:10 UTC

[beam] branch master updated: Support installing Beam SDK from a wheel distribution in SDK containers. (#5446)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1abeee9  Support installing Beam SDK from a wheel distribution in SDK containers. (#5446)
1abeee9 is described below

commit 1abeee9ee0da16d3a9ab713d9b0c4198ddf766f0
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Tue May 29 17:06:50 2018 -0700

    Support installing Beam SDK from a wheel distribution in SDK containers. (#5446)
    
    Support installing Beam SDK from a wheel file.
---
 sdks/python/container/boot.go    |  12 ++-
 sdks/python/container/piputil.go | 214 ++++++++++++++++++++++-----------------
 2 files changed, 130 insertions(+), 96 deletions(-)

diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 14e2619..9001868 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -27,8 +27,8 @@ import (
 	"strings"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
-	pbpipeline "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	pbjob "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pbpipeline "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/provision"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
@@ -36,6 +36,8 @@ import (
 )
 
 var (
+	acceptableWhlSpecs = []string{"cp27-cp27mu-manylinux1_x86_64.whl"}
+
 	// Contract: https://s.apache.org/beam-fn-api-container-contract.
 
 	id                = flag.String("id", "", "Local identifier (required).")
@@ -48,10 +50,10 @@ var (
 
 const (
 	sdkHarnessEntrypoint = "apache_beam.runners.worker.sdk_worker_main"
-	// Please keep these names in sync with setup dependency.py
+	// Please keep these names in sync with stager.py
 	workflowFile      = "workflow.tar.gz"
 	requirementsFile  = "requirements.txt"
-	sdkFile           = "dataflow_python_sdk.tar"
+	sdkSrcFile        = "dataflow_python_sdk.tar"
 	extraPackagesFile = "extra_packages.txt"
 )
 
@@ -100,7 +102,7 @@ func main() {
 	// TODO(herohde): the packages to install should be specified explicitly. It
 	// would also be possible to install the SDK in the Dockerfile.
 	if setupErr := installSetupPackages(files, dir); setupErr != nil {
-		log.Fatalf("Failed to install SDK: %v", setupErr)
+		log.Fatalf("Failed to install required packages: %v", setupErr)
 	}
 
 	// (3) Invoke python
@@ -133,7 +135,7 @@ func installSetupPackages(mds []*pbjob.ArtifactMetadata, workDir string) error {
 	// Install the Dataflow Python SDK and worker packages.
 	// We install the extra requirements in case of using the beam sdk. These are ignored by pip
 	// if the user is using an SDK that does not provide these.
-	if err := pipInstallPackage(files, workDir, sdkFile, false, false, []string{"gcp"}); err != nil {
+	if err := installSdk(files, workDir, sdkSrcFile, acceptableWhlSpecs); err != nil {
 		return fmt.Errorf("failed to install SDK: %v", err)
 	}
 	// The staged files will not disappear due to restarts because workDir is a
diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go
index b227774..b8d837a 100644
--- a/sdks/python/container/piputil.go
+++ b/sdks/python/container/piputil.go
@@ -16,116 +16,148 @@
 package main
 
 import (
-  "bufio"
-  "bytes"
-  "errors"
-  "fmt"
-  "io/ioutil"
-  "log"
-  "path/filepath"
-  "strings"
+	"bufio"
+	"bytes"
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"path/filepath"
+	"strings"
 
-  "github.com/apache/beam/sdks/go/pkg/beam/util/execx"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
 )
 
 const (
-  pip = "/usr/local/bin/pip"
+	pip = "/usr/local/bin/pip"
 )
 
 // pipInstallRequirements installs the given requirement, if present.
 func pipInstallRequirements(files []string, dir, name string) error {
-  for _, file := range files {
-    if file == name {
-      // We run the install process in two rounds in order to avoid as much
-      // as possible PyPI downloads. In the first round the --find-links
-      // option will make sure that only things staged in the worker will be
-      // used without following their dependencies.
-      args := []string{"install", "-r", filepath.Join(dir, name), "--no-index", "--no-deps", "--find-links", dir}
-      if err := execx.Execute(pip, args...); err != nil {
-        return err
-      }
-      // The second install round opens up the search for packages on PyPI and
-      // also installs dependencies. The key is that if all the packages have
-      // been installed in the first round then this command will be a no-op.
-      args = []string{"install", "-r", filepath.Join(dir, name), "--find-links", dir}
-      return execx.Execute(pip, args...)
-    }
-  }
-  return nil
+	for _, file := range files {
+		if file == name {
+			// We run the install process in two rounds in order to avoid as much
+			// as possible PyPI downloads. In the first round the --find-links
+			// option will make sure that only things staged in the worker will be
+			// used without following their dependencies.
+			args := []string{"install", "-r", filepath.Join(dir, name), "--no-index", "--no-deps", "--find-links", dir}
+			if err := execx.Execute(pip, args...); err != nil {
+				return err
+			}
+			// The second install round opens up the search for packages on PyPI and
+			// also installs dependencies. The key is that if all the packages have
+			// been installed in the first round then this command will be a no-op.
+			args = []string{"install", "-r", filepath.Join(dir, name), "--find-links", dir}
+			return execx.Execute(pip, args...)
+		}
+	}
+	return nil
 }
 
 // pipInstallPackage installs the given package, if present.
 func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error {
-  for _, file := range files {
-    if file == name {
-      var packageSpec = name
-      if extras != nil {
-        packageSpec += "[" + strings.Join(extras, ",") + "]"
-      }
-      if force {
-        // We only use force reinstallation for packages specified using the
-        // --extra_package flag.  In this case, we always want to use the
-        // user-specified package, overwriting any existing package already
-        // installed.  At the same time, we want to avoid reinstalling any
-        // dependencies.  The "pip install" command doesn't have a clean way to do
-        // this, so we do this in two steps.
-        //
-        // First, we use the three flags "--upgrade --force-reinstall --no-deps"
-        // to "pip install" so as to force the package to be reinstalled, while
-        // avoiding reinstallation of dependencies.  Note now that if any needed
-        // dependencies were not installed, they will still be missing.
-        //
-        // Next, we run "pip install" on the package without any flags.  Since the
-        // installed version will match the package specified, the package itself
-        // will not be reinstalled, but its dependencies will now be resolved and
-        // installed if necessary.  This achieves our goal outlined above.
-        args := []string{"install", "--upgrade", "--force-reinstall", "--no-deps",
-          filepath.Join(dir, packageSpec)}
-        err := execx.Execute(pip, args...)
-        if err != nil {
-          return err
-        }
-        args = []string{"install", filepath.Join(dir, packageSpec)}
-        return execx.Execute(pip, args...)
-      }
+	for _, file := range files {
+		if file == name {
+			var packageSpec = name
+			if extras != nil {
+				packageSpec += "[" + strings.Join(extras, ",") + "]"
+			}
+			if force {
+				// We only use force reinstallation for packages specified using the
+				// --extra_package flag.  In this case, we always want to use the
+				// user-specified package, overwriting any existing package already
+				// installed.  At the same time, we want to avoid reinstalling any
+				// dependencies.  The "pip install" command doesn't have a clean way to do
+				// this, so we do this in two steps.
+				//
+				// First, we use the three flags "--upgrade --force-reinstall --no-deps"
+				// to "pip install" so as to force the package to be reinstalled, while
+				// avoiding reinstallation of dependencies.  Note now that if any needed
+				// dependencies were not installed, they will still be missing.
+				//
+				// Next, we run "pip install" on the package without any flags.  Since the
+				// installed version will match the package specified, the package itself
+				// will not be reinstalled, but its dependencies will now be resolved and
+				// installed if necessary.  This achieves our goal outlined above.
+				args := []string{"install", "--upgrade", "--force-reinstall", "--no-deps",
+					filepath.Join(dir, packageSpec)}
+				err := execx.Execute(pip, args...)
+				if err != nil {
+					return err
+				}
+				args = []string{"install", filepath.Join(dir, packageSpec)}
+				return execx.Execute(pip, args...)
+			}
 
-      // Case when we do not perform a forced reinstall.
-      args := []string{"install", filepath.Join(dir, packageSpec)}
-      return execx.Execute(pip, args...)
-    }
-  }
-  if optional {
-    return nil
-  }
-  return errors.New("package '" + name + "' not found")
+			// Case when we do not perform a forced reinstall.
+			args := []string{"install", filepath.Join(dir, packageSpec)}
+			return execx.Execute(pip, args...)
+		}
+	}
+	if optional {
+		return nil
+	}
+	return errors.New("package '" + name + "' not found")
 }
 
 // installExtraPackages installs all the packages declared in the extra
 // packages manifest file.
 func installExtraPackages(files []string, extraPackagesFile, dir string) error {
-  // First check that extra packages manifest file is present.
-  for _, file := range files {
-    if file != extraPackagesFile {
-      continue
-    }
+	// First check that extra packages manifest file is present.
+	for _, file := range files {
+		if file != extraPackagesFile {
+			continue
+		}
 
-    // Found the manifest. Install extra packages.
-    manifest, err := ioutil.ReadFile(filepath.Join(dir, extraPackagesFile))
-    if err != nil {
-      return fmt.Errorf("failed to read extra packages manifest file: %v", err)
-    }
+		// Found the manifest. Install extra packages.
+		manifest, err := ioutil.ReadFile(filepath.Join(dir, extraPackagesFile))
+		if err != nil {
+			return fmt.Errorf("failed to read extra packages manifest file: %v", err)
+		}
 
-    s := bufio.NewScanner(bytes.NewReader(manifest))
-    s.Split(bufio.ScanLines)
+		s := bufio.NewScanner(bytes.NewReader(manifest))
+		s.Split(bufio.ScanLines)
 
-    for s.Scan() {
-      extraPackage := s.Text()
-      log.Printf("Installing extra package: %s", extraPackage)
-      if err = pipInstallPackage(files, dir, extraPackage, true, false, nil); err != nil {
-        return fmt.Errorf("failed to install extra package %s: %v", extraPackage, err)
-      }
-    }
-    return nil
-  }
-  return nil
+		for s.Scan() {
+			extraPackage := s.Text()
+			log.Printf("Installing extra package: %s", extraPackage)
+			if err = pipInstallPackage(files, dir, extraPackage, true, false, nil); err != nil {
+				return fmt.Errorf("failed to install extra package %s: %v", extraPackage, err)
+			}
+		}
+		return nil
+	}
+	return nil
+}
+
+func findBeamSdkWhl(files []string, acceptableWhlSpecs []string) string {
+	for _, file := range files {
+		if strings.HasPrefix(file, "apache_beam") {
+			for _, s := range acceptableWhlSpecs {
+				if strings.HasSuffix(file, s) {
+					log.Printf("Found Apache Beam SDK wheel: %v", file)
+					return file
+				}
+			}
+		}
+	}
+	return ""
+}
+
+// InstallSdk installs Beam SDK: First, we try to find a compiled
+// wheel distribution of Apache Beam among staged files. If we find it, we
+// assume that the pipleine was started with the Beam SDK found in the wheel
+// file, and we try to install it. If not successful, we fall back to installing
+// SDK from source tarball provided in sdkSrcFile.
+func installSdk(files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string) error {
+	sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)
+	if sdkWhlFile != "" {
+		err := pipInstallPackage(files, workDir, sdkWhlFile, false, false, []string{"gcp"})
+		if err == nil {
+			return nil
+		}
+		log.Printf("Could not install Apache Beam SDK from a wheel: %v, proceeding to install SDK from source tarball.", err)
+	}
+	err := pipInstallPackage(files, workDir, sdkSrcFile, false, false, []string{"gcp"})
+	return err
 }

-- 
To stop receiving notification emails like this one, please contact
altay@apache.org.