You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/09/02 15:24:39 UTC

[beam] branch master updated: [BEAM-22859] Allow the specification of extra packages for external Python transforms. (#22991)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b46ef40289 [BEAM-22859] Allow the specification of extra packages for external Python transforms. (#22991)
4b46ef40289 is described below

commit 4b46ef40289ddf33aac1aac0ca6741d96407bd3b
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Fri Sep 2 08:24:29 2022 -0700

    [BEAM-22859] Allow the specification of extra packages for external Python transforms. (#22991)
    
    
    This fixes #22859.
---
 .../extensions/python/PythonExternalTransform.java | 49 +++++++++++++++++++---
 .../beam/sdk/extensions/python/PythonService.java  | 31 ++++++++++++--
 .../python/PythonExternalTransformTest.java        | 23 ++++++++++
 3 files changed, 95 insertions(+), 8 deletions(-)

diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
index 879cc8dbb5d..4efbf62fe25 100644
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
+++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
@@ -17,8 +17,14 @@
  */
 package org.apache.beam.sdk.extensions.python;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
@@ -51,8 +57,11 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -65,6 +74,7 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
   private String fullyQualifiedName;
 
   private String expansionService;
+  private List<String> extraPackages;
 
   // We preseve the order here since Schema's care about order of fields but the order will not
   // matter when applying kwargs at the Python side.
@@ -79,6 +89,7 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
   private PythonExternalTransform(String fullyQualifiedName, String expansionService) {
     this.fullyQualifiedName = fullyQualifiedName;
     this.expansionService = expansionService;
+    this.extraPackages = new ArrayList<>();
     this.kwargsMap = new TreeMap<>();
     this.typeHints = new HashMap<>();
     // TODO(https://github.com/apache/beam/issues/21567): remove a default type hint for
@@ -266,6 +277,22 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
     return this;
   }
 
+  /**
+   * Specifies that the given Python packages are required for this transform, which will cause them
+   * to be installed in both the construction-time and execution time environment.
+   *
+   * @param extraPackages a list of pip-installable package specifications, such as would be found
+   *     in a requirements file.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public PythonExternalTransform<InputT, OutputT> withExtraPackages(List<String> extraPackages) {
+    Preconditions.checkState(
+        Strings.isNullOrEmpty(expansionService),
+        "Extra packages only apply to auto-started expansion service.");
+    this.extraPackages = extraPackages;
+    return this;
+  }
+
   @VisibleForTesting
   Row buildOrGetKwargsRow() {
     if (providedKwargsRow != null) {
@@ -418,13 +445,25 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
         return apply(input, expansionService, payload);
       } else {
         int port = PythonService.findAvailablePort();
+        ImmutableList.Builder<String> args = ImmutableList.builder();
+        args.add("--port", "" + port, "--fully_qualified_name_glob", "*");
+        if (!extraPackages.isEmpty()) {
+          File requirementsFile = File.createTempFile("requirements", ".txt");
+          requirementsFile.deleteOnExit();
+          try (Writer fout =
+              new OutputStreamWriter(
+                  new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
+            for (String pkg : extraPackages) {
+              fout.write(pkg);
+              fout.write('\n');
+            }
+          }
+          args.add("--requirements_file=" + requirementsFile.getAbsolutePath());
+        }
         PythonService service =
             new PythonService(
-                "apache_beam.runners.portability.expansion_service_main",
-                "--port",
-                "" + port,
-                "--fully_qualified_name_glob",
-                "*");
+                    "apache_beam.runners.portability.expansion_service_main", args.build())
+                .withExtraPackages(extraPackages);
         try (AutoCloseable p = service.start()) {
           PythonService.waitForPort("localhost", port, 15000);
           return apply(input, String.format("localhost:%s", port), payload);
diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java
index 346227df1a9..7ececfb94bf 100644
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java
+++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java
@@ -40,16 +40,36 @@ public class PythonService {
 
   private final String module;
   private final List<String> args;
+  private final List<String> extraPackages;
 
-  public PythonService(String module, List<String> args) {
+  public PythonService(String module, List<String> args, List<String> extraPackages) {
     this.module = module;
     this.args = args;
+    this.extraPackages = extraPackages;
+  }
+
+  public PythonService(String module, List<String> args) {
+    this(module, args, ImmutableList.of());
   }
 
   public PythonService(String module, String... args) {
     this(module, Arrays.asList(args));
   }
 
+  /**
+   * Specifies that the given Python packages should be installed for this service environment.
+   *
+   * @param extraPackages a list of pip-installable package specifications, such as would be found
+   *     in a requirements file.
+   * @return a Python Service object that will ensure these dependencies are available.
+   */
+  public PythonService withExtraPackages(List<String> extraPackages) {
+    return new PythonService(
+        module,
+        args,
+        ImmutableList.<String>builder().addAll(this.extraPackages).addAll(extraPackages).build());
+  }
+
   @SuppressWarnings("argument.type.incompatible")
   public AutoCloseable start() throws IOException, InterruptedException {
     File bootstrapScript = File.createTempFile("bootstrap_beam_venv", ".py");
@@ -57,8 +77,13 @@ public class PythonService {
     try (FileOutputStream fout = new FileOutputStream(bootstrapScript.getAbsolutePath())) {
       ByteStreams.copy(getClass().getResourceAsStream("bootstrap_beam_venv.py"), fout);
     }
-    List<String> bootstrapCommand =
-        ImmutableList.of(whichPython(), bootstrapScript.getAbsolutePath());
+    List<String> bootstrapCommand = new ArrayList<>();
+    bootstrapCommand.add(whichPython());
+    bootstrapCommand.add(bootstrapScript.getAbsolutePath());
+    // TODO(BEAM-22856): If the version is a non .dev version, pass it here as --beam_version.
+    if (!extraPackages.isEmpty()) {
+      bootstrapCommand.add("--extra_packages=" + String.join(";", extraPackages));
+    }
     LOG.info("Running bootstrap command " + bootstrapCommand);
     Process bootstrap =
         new ProcessBuilder(bootstrapCommand).redirectError(ProcessBuilder.Redirect.INHERIT).start();
diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java
index 822708cfef3..72cbeae1d27 100644
--- a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java
+++ b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java
@@ -26,10 +26,13 @@ import java.time.Instant;
 import java.util.Map;
 import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaTranslation;
 import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.UsesPythonExpansionService;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.util.PythonCallableSource;
@@ -37,9 +40,11 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -47,6 +52,7 @@ import org.junit.runners.JUnit4;
 public class PythonExternalTransformTest implements Serializable {
   @Ignore("https://github.com/apache/beam/issues/21561")
   @Test
+  @Category({ValidatesRunner.class, UsesPythonExpansionService.class})
   public void trivialPythonTransform() {
     Pipeline p = Pipeline.create();
     PCollection<String> output =
@@ -60,6 +66,23 @@ public class PythonExternalTransformTest implements Serializable {
     // TODO: Run this on a multi-language supporting runner.
   }
 
+  @Ignore("https://github.com/apache/beam/issues/21561")
+  @Test
+  @Category({ValidatesRunner.class, UsesPythonExpansionService.class})
+  public void pythonTransformWithDependencies() {
+    Pipeline p = Pipeline.create();
+    PCollection<String> output =
+        p.apply(Create.of("elephant", "mouse", "sheep"))
+            .apply(
+                PythonExternalTransform.<PCollection<String>, PCollection<String>>from(
+                        "apache_beam.Map")
+                    .withArgs(PythonCallableSource.of("import inflection\ninflection.pluralize"))
+                    .withExtraPackages(ImmutableList.of("inflection"))
+                    .withOutputCoder(StringUtf8Coder.of()));
+    PAssert.that(output).containsInAnyOrder("elephants", "mice", "sheep");
+    // TODO: Run this on a multi-language supporting runner.
+  }
+
   @Test
   public void generateArgsEmpty() {
     PythonExternalTransform<?, ?> transform =