You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/03/22 20:18:43 UTC

[beam] 01/01: Revert "[BEAM-14038] Auto-startup for Python expansion service. (#17035)"

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch revert-17035-py-expansion-service
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f5d62dd56b9d541c734ba9a142d24cdecfed784e
Author: Lukasz Cwik <lc...@google.com>
AuthorDate: Tue Mar 22 13:17:08 2022 -0700

    Revert "[BEAM-14038] Auto-startup for Python expansion service. (#17035)"
    
    This reverts commit 0510cecb8a6bf8950699bf7a2253ee82bf37ea39.
---
 sdks/java/extensions/python/build.gradle           |  34 ------
 .../extensions/python/ExternalPythonTransform.java | 110 -----------------
 .../beam/sdk/extensions/python/PythonService.java  | 132 ---------------------
 .../beam/sdk/extensions/python/package-info.java   |  20 ----
 .../sdk/extensions/python/bootstrap_beam_venv.py   | 115 ------------------
 .../python/ExternalPythonTransformTest.java        |  49 --------
 settings.gradle.kts                                |   1 -
 7 files changed, 461 deletions(-)

diff --git a/sdks/java/extensions/python/build.gradle b/sdks/java/extensions/python/build.gradle
deleted file mode 100644
index c48518e..0000000
--- a/sdks/java/extensions/python/build.gradle
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-plugins { id 'org.apache.beam.module' }
-applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.extensions.python')
-
-description = "Apache Beam :: SDKs :: Java :: Extensions :: Python"
-
-dependencies {
-    implementation library.java.vendored_guava_26_0_jre
-    implementation library.java.vendored_grpc_1_43_2
-    implementation library.java.slf4j_api
-    implementation project(path: ":model:pipeline", configuration: "shadow")
-    implementation project(path: ":runners:core-construction-java")
-    implementation project(path: ":sdks:java:core", configuration: "shadow")
-    testImplementation library.java.junit
-    testImplementation library.java.hamcrest
-    testRuntimeOnly library.java.slf4j_simple
-}
diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
deleted file mode 100644
index 163f873..0000000
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.python;
-
-import java.util.Set;
-import java.util.UUID;
-import org.apache.beam.model.pipeline.v1.ExternalTransforms;
-import org.apache.beam.runners.core.construction.External;
-import org.apache.beam.sdk.coders.RowCoder;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.SchemaTranslation;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-
-/** Wrapper for invoking external Python transforms. */
-public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput>
-    extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
-
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) {
-    this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
-  }
-
-  @Override
-  public OutputT expand(InputT input) {
-    int port;
-    try {
-      port = PythonService.findAvailablePort();
-      PythonService service =
-          new PythonService(
-              "apache_beam.runners.portability.expansion_service_main",
-              "--port",
-              "" + port,
-              "--fully_qualified_name_glob",
-              "*");
-      Schema payloadSchema =
-          Schema.of(
-              Schema.Field.of("constructor", Schema.FieldType.STRING),
-              Schema.Field.of("args", Schema.FieldType.row(args.getSchema())),
-              Schema.Field.of("kwargs", Schema.FieldType.row(kwargs.getSchema())));
-      payloadSchema.setUUID(UUID.randomUUID());
-      Row payloadRow =
-          Row.withSchema(payloadSchema).addValues(fullyQualifiedName, args, kwargs).build();
-      ExternalTransforms.ExternalConfigurationPayload payload =
-          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
-              .setSchema(SchemaTranslation.schemaToProto(payloadSchema, true))
-              .setPayload(
-                  ByteString.copyFrom(
-                      CoderUtils.encodeToByteArray(RowCoder.of(payloadSchema), payloadRow)))
-              .build();
-      try (AutoCloseable p = service.start()) {
-        PythonService.waitForPort("localhost", port, 15000);
-        PTransform<PInput, PCollectionTuple> transform =
-            External.<PInput, Object>of(
-                    "beam:transforms:python:fully_qualified_named",
-                    payload.toByteArray(),
-                    "localhost:" + port)
-                .withMultiOutputs();
-        PCollectionTuple outputs;
-        if (input instanceof PCollection) {
-          outputs = ((PCollection<?>) input).apply(transform);
-        } else if (input instanceof PCollectionTuple) {
-          outputs = ((PCollectionTuple) input).apply(transform);
-        } else if (input instanceof PBegin) {
-          outputs = ((PBegin) input).apply(transform);
-        } else {
-          throw new RuntimeException("Unhandled input type " + input.getClass());
-        }
-        Set<TupleTag<?>> tags = outputs.getAll().keySet();
-        if (tags.size() == 1) {
-          return (OutputT) outputs.get(Iterables.getOnlyElement(tags));
-        } else {
-          return (OutputT) outputs;
-        }
-      }
-    } catch (RuntimeException exn) {
-      throw exn;
-    } catch (Exception exn) {
-      throw new RuntimeException(exn);
-    }
-  }
-}
diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java
deleted file mode 100644
index e4a5993..0000000
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonService.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.python;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Utility to bootstrap and start a Beam Python service. */
-public class PythonService {
-  private static final Logger LOG = LoggerFactory.getLogger(PythonService.class);
-
-  private final String module;
-  private final List<String> args;
-
-  public PythonService(String module, List<String> args) {
-    this.module = module;
-    this.args = args;
-  }
-
-  public PythonService(String module, String... args) {
-    this(module, Arrays.asList(args));
-  }
-
-  @SuppressWarnings("argument.type.incompatible")
-  public AutoCloseable start() throws IOException, InterruptedException {
-    File bootstrapScript = File.createTempFile("bootstrap_beam_venv", ".py");
-    bootstrapScript.deleteOnExit();
-    try (FileOutputStream fout = new FileOutputStream(bootstrapScript.getAbsolutePath())) {
-      ByteStreams.copy(getClass().getResourceAsStream("bootstrap_beam_venv.py"), fout);
-    }
-    List<String> bootstrapCommand = ImmutableList.of("python", bootstrapScript.getAbsolutePath());
-    LOG.info("Running bootstrap command " + bootstrapCommand);
-    Process bootstrap =
-        new ProcessBuilder("python", bootstrapScript.getAbsolutePath())
-            .redirectError(ProcessBuilder.Redirect.INHERIT)
-            .start();
-    bootstrap.getOutputStream().close();
-    BufferedReader reader =
-        new BufferedReader(new InputStreamReader(bootstrap.getInputStream(), Charsets.UTF_8));
-    String lastLine = reader.readLine();
-    String lastNonEmptyLine = lastLine;
-    while (lastLine != null) {
-      LOG.info(lastLine);
-      if (lastLine.length() > 0) {
-        lastNonEmptyLine = lastLine;
-      }
-      lastLine = reader.readLine();
-    }
-    reader.close(); // Make SpotBugs happy.
-    int result = bootstrap.waitFor();
-    if (result != 0) {
-      throw new RuntimeException(
-          "Python boostrap failed with error " + result + ", " + lastNonEmptyLine);
-    }
-    String pythonExecutable = lastNonEmptyLine;
-    List<String> command = new ArrayList<>();
-    command.add(pythonExecutable);
-    command.add("-m");
-    command.add(module);
-    command.addAll(args);
-    LOG.info("Starting python service with arguments " + command);
-    Process p =
-        new ProcessBuilder(command)
-            .redirectError(ProcessBuilder.Redirect.INHERIT)
-            .redirectOutput(ProcessBuilder.Redirect.INHERIT)
-            .start();
-    return p::destroy;
-  }
-
-  public static int findAvailablePort() throws IOException {
-    ServerSocket s = new ServerSocket(0);
-    try {
-      return s.getLocalPort();
-    } finally {
-      s.close();
-      try {
-        // Some systems don't free the port for future use immediately.
-        Thread.sleep(100);
-      } catch (InterruptedException exn) {
-        // ignore
-      }
-    }
-  }
-
-  public static void waitForPort(String host, int port, int timeoutMs)
-      throws TimeoutException, InterruptedException {
-    long start = System.currentTimeMillis();
-    long duration = 10;
-    while (System.currentTimeMillis() - start < timeoutMs) {
-      try {
-        new Socket(host, port).close();
-        return;
-      } catch (IOException exn) {
-        Thread.sleep(duration);
-        duration = (long) (duration * 1.2);
-      }
-    }
-    throw new TimeoutException(
-        "Timeout waiting for Python service startup after "
-            + (System.currentTimeMillis() - start)
-            + " seconds.");
-  }
-}
diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java
deleted file mode 100644
index d57dab3..0000000
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/** Extensions for invoking Python transforms from the Beam Java SDK. */
-package org.apache.beam.sdk.extensions.python;
diff --git a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py b/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py
deleted file mode 100644
index 08120b8..0000000
--- a/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py
+++ /dev/null
@@ -1,115 +0,0 @@
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-"""A utility for bootstrapping a BeamPython install.
-
-This utility can be called with any version of Python, and attempts to create
-a Python virtual environment with the requested version of Beam, and any
-extra dependencies as required, installed.
-
-The virtual environment will be placed in Apache Beam's cache directory, and
-will be re-used if the parameters match.
-
-If this script exits successfully, the last line will be the full path to a
-suitable python executable.
-"""
-
-import argparse
-import distutils.version
-import hashlib
-import json
-import os
-import shutil
-import subprocess
-import sys
-
-
-def main():
-    if sys.version_info < (3, ):
-        # Run this script with Python 3.
-        os.execlp('python3', 'python3', *sys.argv)
-        return  # In case windows returns...
-    else:
-        import urllib.request
-
-    parser = argparse.ArgumentParser()
-    parser.add_argument('--python_version', help="Python major version.")
-    parser.add_argument('--beam_version',
-                        help="Beam version.",
-                        default="latest")
-    parser.add_argument(
-        '--extra_packages',
-        help="Semi-colon delimited set of python dependencies.")
-    parser.add_argument('--cache_dir',
-                        default=os.path.expanduser("~/.apache_beam/cache"))
-
-    options = parser.parse_args()
-    if options.python_version:
-        py_version = options.python_version
-        executable = 'python' + py_version
-    else:
-        py_version = '%s.%s' % sys.version_info[:2]
-        executable = sys.executable
-
-    if options.beam_version == 'latest':
-        info = json.load(
-            urllib.request.urlopen("https://pypi.org/pypi/apache_beam/json"))
-
-        def maybe_strict_version(s):
-            try:
-                return distutils.version.StrictVersion(s)
-            except:
-                return distutils.version.StrictVersion('0.0')
-
-        beam_version = max(info['releases'], key=maybe_strict_version)
-        beam_package = 'apache_beam[gcp,aws,asure,dataframe]==' + beam_version
-    elif (os.path.exists(options.beam_version)
-          or options.beam_version.startswith('http://')
-          or options.beam_version.startswith('https://')):
-        # It's a path to a tarball.
-        beam_version = os.path.basename(options.beam_version)
-        beam_package = options.beam_version
-    else:
-        beam_version = options.beam_version
-        beam_package = 'apache_beam[gcp,aws,asure,dataframe]==' + beam_version
-
-    deps = options.extra_packages.split(';') if options.extra_packages else []
-    venv_dir = os.path.join(
-        options.cache_dir, 'venvs', 'py-%s-beam-%s-%s' %
-        (py_version, beam_version,
-         hashlib.sha1(';'.join(sorted(deps)).encode('utf-8')).hexdigest()))
-    venv_python = os.path.join(venv_dir, 'bin', 'python')
-
-    if not os.path.exists(venv_python):
-        try:
-            subprocess.run([executable, '-m', 'venv', venv_dir], check=True)
-            # See https://issues.apache.org/jira/browse/BEAM-14092
-            subprocess.run([
-                venv_python, '-m', 'pip', 'install', beam_package,
-                'pyparsing==2.4.2'
-            ],
-                           check=True)
-            if deps:
-                subprocess.run([venv_python, '-m', 'pip', 'install'] + deps,
-                               check=True)
-            # Sanity check the installation.
-            subprocess.run([venv_python, '-c', 'import apache_beam'],
-                           check=True)
-        except:
-            shutil.rmtree(venv_dir)
-            raise
-
-    print(venv_python)
-
-
-if __name__ == '__main__':
-    main()
diff --git a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java b/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java
deleted file mode 100644
index 9bb25ad..0000000
--- a/sdks/java/extensions/python/src/test/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransformTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.python;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptors;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class ExternalPythonTransformTest implements Serializable {
-  @Test
-  public void trivialPythonTransform() {
-    Pipeline p = Pipeline.create();
-    PCollection<String> output =
-        p.apply(Create.of(KV.of("A", "x"), KV.of("A", "y"), KV.of("B", "z")))
-            .apply(
-                new ExternalPythonTransform<
-                    PCollection<KV<String, String>>, PCollection<KV<String, Iterable<String>>>>(
-                    "apache_beam.GroupByKey", Row.nullRow(Schema.of()), Row.nullRow(Schema.of())))
-            .apply(MapElements.into(TypeDescriptors.strings()).via(kv -> kv.getKey()));
-    PAssert.that(output).containsInAnyOrder("A", "B");
-    // TODO: Run this on a multi-language supporting runner.
-  }
-}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 7d3401a..55a0c85 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -129,7 +129,6 @@ include(":sdks:java:extensions:jackson")
 include(":sdks:java:extensions:join-library")
 include(":sdks:java:extensions:ml")
 include(":sdks:java:extensions:protobuf")
-include(":sdks:java:extensions:python")
 include("sdks:java:extensions:sbe")
 include(":sdks:java:extensions:schemaio-expansion-service")
 include(":sdks:java:extensions:sketching")