You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/09/02 15:24:39 UTC
[beam] branch master updated: [BEAM-22859] Allow the specification of extra packages for external Python transforms. (#22991)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4b46ef40289 [BEAM-22859] Allow the specification of extra packages for external Python transforms. (#22991)
4b46ef40289 is described below
commit 4b46ef40289ddf33aac1aac0ca6741d96407bd3b
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Fri Sep 2 08:24:29 2022 -0700
[BEAM-22859] Allow the specification of extra packages for external Python transforms. (#22991)
This fixes #22859.
---
.../extensions/python/PythonExternalTransform.java | 49 +++++++++++++++++++---
.../beam/sdk/extensions/python/PythonService.java | 31 ++++++++++++--
.../python/PythonExternalTransformTest.java | 23 ++++++++++
3 files changed, 95 insertions(+), 8 deletions(-)
diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
index 879cc8dbb5d..4efbf62fe25 100644
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
+++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
@@ -17,8 +17,14 @@
*/
package org.apache.beam.sdk.extensions.python;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
@@ -51,8 +57,11 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -65,6 +74,7 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
private String fullyQualifiedName;
private String expansionService;
+ private List<String> extraPackages;
// We preseve the order here since Schema's care about order of fields but the order will not
// matter when applying kwargs at the Python side.
@@ -79,6 +89,7 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
private PythonExternalTransform(String fullyQualifiedName, String expansionService) {
this.fullyQualifiedName = fullyQualifiedName;
this.expansionService = expansionService;
+ this.extraPackages = new ArrayList<>();
this.kwargsMap = new TreeMap<>();
this.typeHints = new HashMap<>();
// TODO(https://github.com/apache/beam/issues/21567): remove a default type hint for
@@ -266,6 +277,22 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
return this;
}
+ /**
+ * Specifies that the given Python packages are required for this transform, which will cause them
+ * to be installed in both the construction-time and execution time environment.
+ *
+ * @param extraPackages a list of pip-installable package specifications, such as would be found
+ * in a requirements file.
+ * @return updated wrapper for the cross-language transform.
+ */
+ public PythonExternalTransform<InputT, OutputT> withExtraPackages(List<String> extraPackages) {
+ Preconditions.checkState(
+ Strings.isNullOrEmpty(expansionService),
+ "Extra packages only apply to auto-started expansion service.");
+ this.extraPackages = extraPackages;
+ return this;
+ }
+
@VisibleForTesting
Row buildOrGetKwargsRow() {
if (providedKwargsRow != null) {
@@ -418,13 +445,25 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
return apply(input, expansionService, payload);
} else {
int port = PythonService.findAvailablePort();
+ ImmutableList.Builder<String> args = ImmutableList.builder();
+ args.add("--port", "" + port, "--fully_qualified_name_glob", "*");
+ if (!extraPackages.isEmpty()) {
+ File requirementsFile = File.createTempFile("requirements", ".txt");
+ requirementsFile.deleteOnExit();
+ try (Writer fout =
+ new OutputStreamWriter(
+ new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
+ for (String pkg : extraPackages) {
+ fout.write(pkg);
+ fout.write('\n');
+ }
+ }
+ args.add("--requirements_file=" + requirementsFile.getAbsolutePath());
+ }
PythonService service =
new PythonService(
- "apache_beam.runners.portability.expansion_service_main",
- "--port",
- "" + port,
- "--fully_qualified_name_glob",
- "*");
+ "apache_beam.runners.portability.expansion_service_main", args.build())
+ .withExtraPackages(extraPackages);
try (AutoCloseable p = service.start()) {
PythonService.waitForPort("localhost", port, 15000);
return apply(input, String.format("localhost:%s", port), payload);
diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java
index 346227df1a9..7ececfb94bf 100644
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java
+++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java
@@ -40,16 +40,36 @@ public class PythonService {
private final String module;
private final List<String> args;
+ private final List<String> extraPackages;
- public PythonService(String module, List<String> args) {
+ public PythonService(String module, List<String> args, List<String> extraPackages) {
this.module = module;
this.args = args;
+ this.extraPackages = extraPackages;
+ }
+
+ public PythonService(String module, List<String> args) {
+ this(module, args, ImmutableList.of());
}
public PythonService(String module, String... args) {
this(module, Arrays.asList(args));
}
+ /**
+ * Specifies that the given Python packages should be installed for this service environment.
+ *
+ * @param extraPackages a list of pip-installable package specifications, such as would be found
+ * in a requirements file.
+ * @return a Python Service object that will ensure these dependencies are available.
+ */
+ public PythonService withExtraPackages(List<String> extraPackages) {
+ return new PythonService(
+ module,
+ args,
+ ImmutableList.<String>builder().addAll(this.extraPackages).addAll(extraPackages).build());
+ }
+
@SuppressWarnings("argument.type.incompatible")
public AutoCloseable start() throws IOException, InterruptedException {
File bootstrapScript = File.createTempFile("bootstrap_beam_venv", ".py");
@@ -57,8 +77,13 @@ public class PythonService {
try (FileOutputStream fout = new FileOutputStream(bootstrapScript.getAbsolutePath())) {
ByteStreams.copy(getClass().getResourceAsStream("bootstrap_beam_venv.py"), fout);
}
- List<String> bootstrapCommand =
- ImmutableList.of(whichPython(), bootstrapScript.getAbsolutePath());
+ List<String> bootstrapCommand = new ArrayList<>();
+ bootstrapCommand.add(whichPython());
+ bootstrapCommand.add(bootstrapScript.getAbsolutePath());
+ // TODO(BEAM-22856): If the version is a non .dev version, pass it here as --beam_version.
+ if (!extraPackages.isEmpty()) {
+ bootstrapCommand.add("--extra_packages=" + String.join(";", extraPackages));
+ }
LOG.info("Running bootstrap command " + bootstrapCommand);
Process bootstrap =
new ProcessBuilder(bootstrapCommand).redirectError(ProcessBuilder.Redirect.INHERIT).start();
diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java
index 822708cfef3..72cbeae1d27 100644
--- a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java
+++ b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/PythonExternalTransformTest.java
@@ -26,10 +26,13 @@ import java.time.Instant;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.UsesPythonExpansionService;
+import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.util.PythonCallableSource;
@@ -37,9 +40,11 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.junit.Ignore;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -47,6 +52,7 @@ import org.junit.runners.JUnit4;
public class PythonExternalTransformTest implements Serializable {
@Ignore("https://github.com/apache/beam/issues/21561")
@Test
+ @Category({ValidatesRunner.class, UsesPythonExpansionService.class})
public void trivialPythonTransform() {
Pipeline p = Pipeline.create();
PCollection<String> output =
@@ -60,6 +66,23 @@ public class PythonExternalTransformTest implements Serializable {
// TODO: Run this on a multi-language supporting runner.
}
+ @Ignore("https://github.com/apache/beam/issues/21561")
+ @Test
+ @Category({ValidatesRunner.class, UsesPythonExpansionService.class})
+ public void pythonTransformWithDependencies() {
+ Pipeline p = Pipeline.create();
+ PCollection<String> output =
+ p.apply(Create.of("elephant", "mouse", "sheep"))
+ .apply(
+ PythonExternalTransform.<PCollection<String>, PCollection<String>>from(
+ "apache_beam.Map")
+ .withArgs(PythonCallableSource.of("import inflection\ninflection.pluralize"))
+ .withExtraPackages(ImmutableList.of("inflection"))
+ .withOutputCoder(StringUtf8Coder.of()));
+ PAssert.that(output).containsInAnyOrder("elephants", "mice", "sheep");
+ // TODO: Run this on a multi-language supporting runner.
+ }
+
@Test
public void generateArgsEmpty() {
PythonExternalTransform<?, ?> transform =