You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2023/06/25 22:18:33 UTC

[beam] branch master updated: Updates Python ExternalTransform to use the transform service when needed (#27228)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d545243b30d Updates Python ExternalTransform to use the transform service when needed (#27228)
d545243b30d is described below

commit d545243b30d1192b76379659b2345fd349ec456d
Author: Chamikara Jayalath <ch...@gmail.com>
AuthorDate: Sun Jun 25 15:18:27 2023 -0700

    Updates Python ExternalTransform to use the transform service when needed (#27228)
    
    * Updates Python ExternalTransform to use the transform service when needed
    
    * Addressing reviewer comments
    
    * Fix yapf
    
    * Fix lint
    
    * Fix yapf
---
 .../python/apache_beam/options/pipeline_options.py |   7 +
 sdks/python/apache_beam/transforms/external.py     |  74 +++++-
 .../utils/transform_service_launcher.py            | 261 +++++++++++++++++++++
 3 files changed, 341 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index dd162fcc098..d56b464e71c 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -533,6 +533,13 @@ class CrossLanguageOptions(PipelineOptions):
             'Should be a json mapping of gradle build targets to pre-built '
             'artifacts (e.g. jar files) expansion endpoints (e.g. host:port).'))
 
+    parser.add_argument(
+        '--use_transform_service',
+        default=False,
+        action='store_true',
+        help='Use the Docker-composed-based transform service when expanding '
+        'cross-language transforms.')
+
 
 def additional_option_ptransform_fn():
   beam.transforms.ptransform.ptransform_fn_typehints_enabled = True
diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py
index 104fc4c7014..5182293ed59 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -23,7 +23,9 @@ import copy
 import functools
 import glob
 import logging
+import subprocess
 import threading
+import uuid
 from collections import OrderedDict
 from collections import namedtuple
 from typing import Dict
@@ -32,6 +34,7 @@ import grpc
 
 from apache_beam import pvalue
 from apache_beam.coders import RowCoder
+from apache_beam.options.pipeline_options import CrossLanguageOptions
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
 from apache_beam.portability.api import beam_expansion_api_pb2
@@ -51,6 +54,7 @@ from apache_beam.typehints.trivial_inference import instance_to_type
 from apache_beam.typehints.typehints import Union
 from apache_beam.typehints.typehints import UnionConstraint
 from apache_beam.utils import subprocess_server
+from apache_beam.utils import transform_service_launcher
 
 DEFAULT_EXPANSION_SERVICE = 'localhost:8097'
 
@@ -668,7 +672,10 @@ class ExternalTransform(ptransform.PTransform):
         transform=transform_proto,
         output_coder_requests=output_coders)
 
-    with ExternalTransform.service(self._expansion_service) as service:
+    expansion_service = _maybe_use_transform_service(
+        self._expansion_service, pipeline.options)
+
+    with ExternalTransform.service(expansion_service) as service:
       response = service.Expand(request)
       if response.error:
         raise RuntimeError(response.error)
@@ -973,6 +980,71 @@ class BeamJarExpansionService(JavaJarExpansionService):
         path_to_jar, extra_args, classpath=classpath, append_args=append_args)
 
 
+def _maybe_use_transform_service(provided_service=None, options=None):
+  # For anything other than 'JavaJarExpansionService' we just use the
+  # provided service. For example, string address of an already available
+  # service.
+  if not isinstance(provided_service, JavaJarExpansionService):
+    return provided_service
+
+  def is_java_available():
+    cmd = ['java', '--version']
+
+    try:
+      subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+    except:  # pylint: disable=bare-except
+      return False
+
+    return True
+
+  def is_docker_available():
+    cmd = ['docker', '--version']
+
+    try:
+      subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+    except:  # pylint: disable=bare-except
+      return False
+
+    return True
+
+  # We try java and docker based expansion services in that order.
+
+  java_available = is_java_available()
+  docker_available = is_docker_available()
+
+  use_transform_service = options.view_as(
+      CrossLanguageOptions).use_transform_service
+
+  if (java_available and provided_service and not use_transform_service):
+    return provided_service
+  elif docker_available:
+    if use_transform_service:
+      error_append = 'it was explicitly requested'
+    elif not java_available:
+      error_append = 'the Java executable is not available in the system'
+    else:
+      error_append = 'a Java expansion service was not provided.'
+
+    project_name = str(uuid.uuid4())
+    port = subprocess_server.pick_port(None)[0]
+
+    logging.info(
+        'Trying to expand the external transform using the Docker Compose '
+        'based transform service since %s. Transform service will be under '
+        'Docker Compose project name %s and will be made available at port %r.'
+        % (error_append, project_name, str(port)))
+
+    from apache_beam import version as beam_version
+    beam_version = beam_version.__version__
+
+    return transform_service_launcher.TransformServiceLauncher(
+        project_name, port, beam_version)
+  else:
+    raise ValueError(
+        'Cannot start an expansion service since neither Java nor '
+        'Docker executables are available in the system.')
+
+
 def memoize(func):
   cache = {}
 
diff --git a/sdks/python/apache_beam/utils/transform_service_launcher.py b/sdks/python/apache_beam/utils/transform_service_launcher.py
new file mode 100644
index 00000000000..84f081e64ad
--- /dev/null
+++ b/sdks/python/apache_beam/utils/transform_service_launcher.py
@@ -0,0 +1,261 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import threading
+import time
+import zipfile
+from pathlib import Path
+
+import grpc
+
+from apache_beam.utils import subprocess_server
+
+_LOGGER = logging.getLogger(__name__)
+
+_COMMAND_POSSIBLE_VALUES = ['up', 'down', 'ps']
+
+_EXPANSION_SERVICE_LAUNCHER_JAR = ':sdks:java:transform-service:launcher:build'
+
+
+class TransformServiceLauncher(object):
+  _DEFAULT_PROJECT_NAME = 'apache.beam.transform.service'
+  _DEFAULT_START_WAIT_TIMEOUT = 25000
+
+  _launchers = {}  # type: ignore
+
+  # Maintaining a static list of launchers to prevent temporary resources
+  # from being created unnecessarily.
+  def __new__(cls, project_name, port, beam_version=None):
+    if project_name not in TransformServiceLauncher._launchers:
+      TransformServiceLauncher._launchers[project_name] = super(
+          TransformServiceLauncher, cls).__new__(cls)
+    return TransformServiceLauncher._launchers[project_name]
+
+  def __init__(self, project_name, port, beam_version=None):
+    logging.info('Initializing the Beam Transform Service %s.' % project_name)
+
+    self._project_name = project_name
+    self._port = port
+    self._address = 'localhost:' + str(self._port)
+
+    self._launcher_lock = threading.RLock()
+
+    self.docker_compose_command_prefix = [
+        'docker-compose', '-p', project_name, '-f', 'TODO path'
+    ]
+
+    # Setting up Docker Compose configuration.
+
+    # We use Docker Compose project name as the name of the temporary directory
+    # to isolate different transform service instances that may be running in
+    # the same machine.
+
+    temp_dir = os.path.join(tempfile.gettempdir(), project_name)
+    if not os.path.exists(temp_dir):
+      os.mkdir(temp_dir)
+
+    # Get the jar with configs
+    path_to_local_jar = subprocess_server.JavaJarServer.local_jar(
+        subprocess_server.JavaJarServer.path_to_beam_jar(
+            _EXPANSION_SERVICE_LAUNCHER_JAR))
+
+    with zipfile.ZipFile(path_to_local_jar) as launcher_jar:
+      launcher_jar.extract('docker-compose.yml', path=temp_dir)
+      launcher_jar.extract('.env', path=temp_dir)
+
+    compose_file = os.path.join(temp_dir, 'docker-compose.yml')
+
+    credentials_dir = os.path.join(temp_dir, 'credentials_dir')
+    if not os.path.exists(credentials_dir):
+      os.mkdir(credentials_dir)
+
+    logging.info('Copying the Google Application Default Credentials file.')
+
+    is_windows = 'windows' in os.name.lower()
+    application_default_path_suffix = (
+        '\\gcloud\\application_default_credentials.json' if is_windows else
+        '.config/gcloud/application_default_credentials.json')
+    application_default_path_file = os.path.join(
+        str(Path.home()), application_default_path_suffix)
+    application_default_path_copied = os.path.join(
+        credentials_dir, 'application_default_credentials.json')
+
+    if os.path.exists(application_default_path_file):
+      shutil.copyfile(
+          application_default_path_file, application_default_path_copied)
+    else:
+      logging.info(
+          'GCP credentials will not be available for the transform service '
+          'since could not find the Google Cloud application default '
+          'credentials file at the expected location %s.' %
+          application_default_path_file)
+
+    self._environmental_variables = {}
+    self._environmental_variables['CREDENTIALS_VOLUME'] = credentials_dir
+    self._environmental_variables['TRANSFORM_SERVICE_PORT'] = str(port)
+    self._environmental_variables['BEAM_VERSION'] = beam_version
+
+    self._docker_compose_start_command_prefix = []
+    self._docker_compose_start_command_prefix.append('docker-compose')
+    self._docker_compose_start_command_prefix.append('-p')
+    self._docker_compose_start_command_prefix.append(project_name)
+    self._docker_compose_start_command_prefix.append('-f')
+    self._docker_compose_start_command_prefix.append(compose_file)
+
+  def _get_channel(self):
+    channel_options = [("grpc.max_receive_message_length", -1),
+                       ("grpc.max_send_message_length", -1)]
+    if hasattr(grpc, 'local_channel_credentials'):
+      # TODO: update this to support secure non-local channels.
+      return grpc.secure_channel(
+          self._address,
+          grpc.local_channel_credentials(),
+          options=channel_options)
+    else:
+      return grpc.insecure_channel(self._address, options=channel_options)
+
+  def __enter__(self):
+    self.start()
+    self.wait_till_up(-1)
+
+    self._channel = self._get_channel()
+
+    from apache_beam import external
+    return external.ExpansionAndArtifactRetrievalStub(self._channel.__enter__())
+
+  def __exit__(self, *args):
+    self.shutdown()
+    self._channel.__exit__(*args)
+
+  def _run_docker_compose_command(self, command, output_override=None):
+    cmd = []
+    cmd.extend(self._docker_compose_start_command_prefix)
+    cmd.extend(command)
+
+    myenv = os.environ.copy()
+    myenv.update(self._environmental_variables)
+
+    process = subprocess.Popen(
+        cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=myenv)
+    std_out, _ = process.communicate()
+
+    if output_override:
+      output_override.write(std_out)
+    else:
+      print(std_out.decode(errors='backslashreplace'))
+
+  def start(self):
+    with self._launcher_lock:
+      self._run_docker_compose_command(['up', '-d'])
+
+  def shutdown(self):
+    with self._launcher_lock:
+      self._run_docker_compose_command(['down'])
+
+  def status(self):
+    with self._launcher_lock:
+      self._run_docker_compose_command(['ps'])
+
+  def wait_till_up(self, timeout_ms):
+    channel = self._get_channel()
+
+    timeout_ms = (
+        TransformServiceLauncher._DEFAULT_START_WAIT_TIMEOUT
+        if timeout_ms <= 0 else timeout_ms)
+
+    # Waiting till the service is up.
+    channel_ready = grpc.channel_ready_future(channel)
+    wait_secs = .1
+    start_time = time.time()
+    while True:
+      if (time.time() - start_time) * 1000 > timeout_ms > 0:
+        raise ValueError(
+            'Transform service did not start in %s seconds.' %
+            (timeout_ms / 1000))
+      try:
+        channel_ready.result(timeout=wait_secs)
+        break
+      except (grpc.FutureTimeoutError, grpc.RpcError):
+        wait_secs *= 1.2
+        logging.log(
+            logging.WARNING if wait_secs > 1 else logging.DEBUG,
+            'Waiting for the transform service to be ready at %s.',
+            self._address)
+
+    logging.info('Transform service ' + self._project_name + ' started.')
+
+  def _get_status(self):
+    tmp = tempfile.NamedTemporaryFile(delete=False)
+    self._run_docker_compose_command(['ps'], tmp)
+    tmp.close()
+    return tmp.name
+
+
+def main(argv):
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--project_name', help='Docker Compose project name.')
+  parser.add_argument(
+      '--command',
+      required=True,
+      choices=_COMMAND_POSSIBLE_VALUES,
+      help='Command to run. Possible values are ' +
+      ', '.join(_COMMAND_POSSIBLE_VALUES))
+  parser.add_argument(
+      '--port',
+      type=int,
+      default=-1,
+      help='External visible port of the transform service.')
+  parser.add_argument(
+      '--beam_version',
+      required=True,
+      help='Beam version of the expansion service containers to be used.')
+
+  known_args, _ = parser.parse_known_args(argv)
+
+  project_name = (
+      TransformServiceLauncher._DEFAULT_PROJECT_NAME
+      if known_args.project_name is None else known_args.project_name)
+  logging.info(
+      'Starting the Beam Transform Service at %s.' % (
+          'the default port' if known_args.port < 0 else
+          (' port ' + str(known_args.port))))
+  launcher = TransformServiceLauncher(
+      project_name, known_args.port, known_args.beam_version)
+
+  if known_args.command == 'up':
+    launcher.start()
+    launcher.wait_till_up(-1)
+  elif known_args.command == 'down':
+    launcher.shutdown()
+  elif known_args.command == 'ps':
+    launcher.status()
+  else:
+    raise ValueError(
+        'Unknown command %s possible values are %s' %
+        (known_args.command, ', '.join(_COMMAND_POSSIBLE_VALUES)))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  main(sys.argv)