You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/03/22 20:18:43 UTC
[beam] 01/01: Revert "[BEAM-14038] Auto-startup for Python expansion service. (#17035)"
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch revert-17035-py-expansion-service
in repository https://gitbox.apache.org/repos/asf/beam.git
commit f5d62dd56b9d541c734ba9a142d24cdecfed784e
Author: Lukasz Cwik <lc...@google.com>
AuthorDate: Tue Mar 22 13:17:08 2022 -0700
Revert "[BEAM-14038] Auto-startup for Python expansion service. (#17035)"
This reverts commit 0510cecb8a6bf8950699bf7a2253ee82bf37ea39.
---
sdks/java/extensions/python/build.gradle | 34 ------
.../extensions/python/ExternalPythonTransform.java | 110 -----------------
.../beam/sdk/extensions/python/PythonService.java | 132 ---------------------
.../beam/sdk/extensions/python/package-info.java | 20 ----
.../sdk/extensions/python/bootstrap_beam_venv.py | 115 ------------------
.../python/ExternalPythonTransformTest.java | 49 --------
settings.gradle.kts | 1 -
7 files changed, 461 deletions(-)
diff --git a/sdks/java/extensions/python/build.gradle b/sdks/java/extensions/python/build.gradle
deleted file mode 100644
index c48518e..0000000
--- a/sdks/java/extensions/python/build.gradle
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-plugins { id 'org.apache.beam.module' }
-applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.extensions.python')
-
-description = "Apache Beam :: SDKs :: Java :: Extensions :: Python"
-
-dependencies {
- implementation library.java.vendored_guava_26_0_jre
- implementation library.java.vendored_grpc_1_43_2
- implementation library.java.slf4j_api
- implementation project(path: ":model:pipeline", configuration: "shadow")
- implementation project(path: ":runners:core-construction-java")
- implementation project(path: ":sdks:java:core", configuration: "shadow")
- testImplementation library.java.junit
- testImplementation library.java.hamcrest
- testRuntimeOnly library.java.slf4j_simple
-}
diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
deleted file mode 100644
index 163f873..0000000
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.python;
-
-import java.util.Set;
-import java.util.UUID;
-import org.apache.beam.model.pipeline.v1.ExternalTransforms;
-import org.apache.beam.runners.core.construction.External;
-import org.apache.beam.sdk.coders.RowCoder;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.SchemaTranslation;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-
-/** Wrapper for invoking external Python transforms. */
-public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
- extends PTransform<InputT, OutputT> {
- private final String fullyQualifiedName;
- private final Row args;
- private final Row kwargs;
-
- public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
- this.fullyQualifiedName = fullyQualifiedName;
- this.args = args;
- this.kwargs = kwargs;
- }
-
- @Override
- public OutputT expand(InputT input) {
- int port;
- try {
- port = PythonService.findAvailablePort();
- PythonService service =
- new PythonService(
- "apache_beam.runners.portability.expansion_service_main",
- "--port",
- "" + port,
- "--fully_qualified_name_glob",
- "*");
- Schema payloadSchema =
- Schema.of(
- Schema.Field.of("constructor", Schema.FieldType.STRING),
- Schema.Field.of("args", Schema.FieldType.row(args.getSchema())),
- Schema.Field.of("kwargs", Schema.FieldType.row(kwargs.getSchema())));
- payloadSchema.setUUID(UUID.randomUUID());
- Row payloadRow =
- Row.withSchema(payloadSchema).addValues(fullyQualifiedName, args, kwargs).build();
- ExternalTransforms.ExternalConfigurationPayload payload =
- ExternalTransforms.ExternalConfigurationPayload.newBuilder()
- .setSchema(SchemaTranslation.schemaToProto(payloadSchema, true))
- .setPayload(
- ByteString.copyFrom(
- CoderUtils.encodeToByteArray(RowCoder.of(payloadSchema), payloadRow)))
- .build();
- try (AutoCloseable p = service.start()) {
- PythonService.waitForPort("localhost", port, 15000);
- PTransform<PInput, PCollectionTuple> transform =
- External.<PInput, Object>of(
- "beam:transforms:python:fully_qualified_named",
- payload.toByteArray(),
- "localhost:" + port)
- .withMultiOutputs();
- PCollectionTuple outputs;
- if (input instanceof PCollection) {
- outputs = ((PCollection<?>) input).apply(transform);
- } else if (input instanceof PCollectionTuple) {
- outputs = ((PCollectionTuple) input).apply(transform);
- } else if (input instanceof PBegin) {
- outputs = ((PBegin) input).apply(transform);
- } else {
- throw new RuntimeException("Unhandled input type " + input.getClass());
- }
- Set<TupleTag<?>> tags = outputs.getAll().keySet();
- if (tags.size() == 1) {
- return (OutputT) outputs.get(Iterables.getOnlyElement(tags));
- } else {
- return (OutputT) outputs;
- }
- }
- } catch (RuntimeException exn) {
- throw exn;
- } catch (Exception exn) {
- throw new RuntimeException(exn);
- }
- }
-}
diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java
deleted file mode 100644
index e4a5993..0000000
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.python;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Utility to bootstrap and start a Beam Python service. */
-public class PythonService {
- private static final Logger LOG = LoggerFactory.getLogger(PythonService.class);
-
- private final String module;
- private final List<String> args;
-
- public PythonService(String module, List<String> args) {
- this.module = module;
- this.args = args;
- }
-
- public PythonService(String module, String... args) {
- this(module, Arrays.asList(args));
- }
-
- @SuppressWarnings("argument.type.incompatible")
- public AutoCloseable start() throws IOException, InterruptedException {
- File bootstrapScript = File.createTempFile("bootstrap_beam_venv", ".py");
- bootstrapScript.deleteOnExit();
- try (FileOutputStream fout = new FileOutputStream(bootstrapScript.getAbsolutePath())) {
- ByteStreams.copy(getClass().getResourceAsStream("bootstrap_beam_venv.py"), fout);
- }
- List<String> bootstrapCommand = ImmutableList.of("python", bootstrapScript.getAbsolutePath());
- LOG.info("Running bootstrap command " + bootstrapCommand);
- Process bootstrap =
- new ProcessBuilder("python", bootstrapScript.getAbsolutePath())
- .redirectError(ProcessBuilder.Redirect.INHERIT)
- .start();
- bootstrap.getOutputStream().close();
- BufferedReader reader =
- new BufferedReader(new InputStreamReader(bootstrap.getInputStream(), Charsets.UTF_8));
- String lastLine = reader.readLine();
- String lastNonEmptyLine = lastLine;
- while (lastLine != null) {
- LOG.info(lastLine);
- if (lastLine.length() > 0) {
- lastNonEmptyLine = lastLine;
- }
- lastLine = reader.readLine();
- }
- reader.close(); // Make SpotBugs happy.
- int result = bootstrap.waitFor();
- if (result != 0) {
- throw new RuntimeException(
- "Python boostrap failed with error " + result + ", " + lastNonEmptyLine);
- }
- String pythonExecutable = lastNonEmptyLine;
- List<String> command = new ArrayList<>();
- command.add(pythonExecutable);
- command.add("-m");
- command.add(module);
- command.addAll(args);
- LOG.info("Starting python service with arguments " + command);
- Process p =
- new ProcessBuilder(command)
- .redirectError(ProcessBuilder.Redirect.INHERIT)
- .redirectOutput(ProcessBuilder.Redirect.INHERIT)
- .start();
- return p::destroy;
- }
-
- public static int findAvailablePort() throws IOException {
- ServerSocket s = new ServerSocket(0);
- try {
- return s.getLocalPort();
- } finally {
- s.close();
- try {
- // Some systems don't free the port for future use immediately.
- Thread.sleep(100);
- } catch (InterruptedException exn) {
- // ignore
- }
- }
- }
-
- public static void waitForPort(String host, int port, int timeoutMs)
- throws TimeoutException, InterruptedException {
- long start = System.currentTimeMillis();
- long duration = 10;
- while (System.currentTimeMillis() - start < timeoutMs) {
- try {
- new Socket(host, port).close();
- return;
- } catch (IOException exn) {
- Thread.sleep(duration);
- duration = (long) (duration * 1.2);
- }
- }
- throw new TimeoutException(
- "Timeout waiting for Python service startup after "
- + (System.currentTimeMillis() - start)
- + " seconds.");
- }
-}
diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java
deleted file mode 100644
index d57dab3..0000000
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/** Extensions for invoking Python transforms from the Beam Java SDK. */
-package org.apache.beam.sdk.extensions.python;
diff --git a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py b/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py
deleted file mode 100644
index 08120b8..0000000
--- a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py
+++ /dev/null
@@ -1,115 +0,0 @@
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-"""A utility for bootstrapping a BeamPython install.
-
-This utility can be called with any version of Python, and attempts to create
-a Python virtual environment with the requested version of Beam, and any
-extra dependencies as required, installed.
-
-The virtual environment will be placed in Apache Beam's cache directory, and
-will be re-used if the parameters match.
-
-If this script exits successfully, the last line will be the full path to a
-suitable python executable.
-"""
-
-import argparse
-import distutils.version
-import hashlib
-import json
-import os
-import shutil
-import subprocess
-import sys
-
-
-def main():
- if sys.version_info < (3, ):
- # Run this script with Python 3.
- os.execlp('python3', 'python3', *sys.argv)
- return # In case windows returns...
- else:
- import urllib.request
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--python_version', help="Python major version.")
- parser.add_argument('--beam_version',
- help="Beam version.",
- default="latest")
- parser.add_argument(
- '--extra_packages',
- help="Semi-colon delimited set of python dependencies.")
- parser.add_argument('--cache_dir',
- default=os.path.expanduser("~/.apache_beam/cache"))
-
- options = parser.parse_args()
- if options.python_version:
- py_version = options.python_version
- executable = 'python' + py_version
- else:
- py_version = '%s.%s' % sys.version_info[:2]
- executable = sys.executable
-
- if options.beam_version == 'latest':
- info = json.load(
- urllib.request.urlopen("https://pypi.org/pypi/apache_beam/json"))
-
- def maybe_strict_version(s):
- try:
- return distutils.version.StrictVersion(s)
- except:
- return distutils.version.StrictVersion('0.0')
-
- beam_version = max(info['releases'], key=maybe_strict_version)
- beam_package = 'apache_beam[gcp,aws,asure,dataframe]==' + beam_version
- elif (os.path.exists(options.beam_version)
- or options.beam_version.startswith('http://')
- or options.beam_version.startswith('https://')):
- # It's a path to a tarball.
- beam_version = os.path.basename(options.beam_version)
- beam_package = options.beam_version
- else:
- beam_version = options.beam_version
- beam_package = 'apache_beam[gcp,aws,asure,dataframe]==' + beam_version
-
- deps = options.extra_packages.split(';') if options.extra_packages else []
- venv_dir = os.path.join(
- options.cache_dir, 'venvs', 'py-%s-beam-%s-%s' %
- (py_version, beam_version,
- hashlib.sha1(';'.join(sorted(deps)).encode('utf-8')).hexdigest()))
- venv_python = os.path.join(venv_dir, 'bin', 'python')
-
- if not os.path.exists(venv_python):
- try:
- subprocess.run([executable, '-m', 'venv', venv_dir], check=True)
- # See https://issues.apache.org/jira/browse/BEAM-14092
- subprocess.run([
- venv_python, '-m', 'pip', 'install', beam_package,
- 'pyparsing==2.4.2'
- ],
- check=True)
- if deps:
- subprocess.run([venv_python, '-m', 'pip', 'install'] + deps,
- check=True)
- # Sanity check the installation.
- subprocess.run([venv_python, '-c', 'import apache_beam'],
- check=True)
- except:
- shutil.rmtree(venv_dir)
- raise
-
- print(venv_python)
-
-
-if __name__ == '__main__':
- main()
diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java
deleted file mode 100644
index 9bb25ad..0000000
--- a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.python;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptors;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class ExternalPythonTransformTest implements Serializable {
- @Test
- public void trivialPythonTransform() {
- Pipeline p = Pipeline.create();
- PCollection<String> output =
- p.apply(Create.of(KV.of("A", "x"), KV.of("A", "y"), KV.of("B", "z")))
- .apply(
- new ExternalPythonTransform<
- PCollection<KV<String, String>>, PCollection<KV<String, Iterable<String>>>>(
- "apache_beam.GroupByKey", Row.nullRow(Schema.of()), Row.nullRow(Schema.of())))
- .apply(MapElements.into(TypeDescriptors.strings()).via(kv -> kv.getKey()));
- PAssert.that(output).containsInAnyOrder("A", "B");
- // TODO: Run this on a multi-language supporting runner.
- }
-}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 7d3401a..55a0c85 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -129,7 +129,6 @@ include(":sdks:java:extensions:jackson")
include(":sdks:java:extensions:join-library")
include(":sdks:java:extensions:ml")
include(":sdks:java:extensions:protobuf")
-include(":sdks:java:extensions:python")
include("sdks:java:extensions:sbe")
include(":sdks:java:extensions:schemaio-expansion-service")
include(":sdks:java:extensions:sketching")