You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2023/06/25 22:18:33 UTC
[beam] branch master updated: Updates Python ExternalTransform to use the transform service when needed (#27228)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d545243b30d Updates Python ExternalTransform to use the transform service when needed (#27228)
d545243b30d is described below
commit d545243b30d1192b76379659b2345fd349ec456d
Author: Chamikara Jayalath <ch...@gmail.com>
AuthorDate: Sun Jun 25 15:18:27 2023 -0700
Updates Python ExternalTransform to use the transform service when needed (#27228)
* Updates Python ExternalTransform to use the transform service when needed
* Addressing reviewer comments
* Fix yapf
* Fix lint
* Fix yapf
---
.../python/apache_beam/options/pipeline_options.py | 7 +
sdks/python/apache_beam/transforms/external.py | 74 +++++-
.../utils/transform_service_launcher.py | 261 +++++++++++++++++++++
3 files changed, 341 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index dd162fcc098..d56b464e71c 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -533,6 +533,13 @@ class CrossLanguageOptions(PipelineOptions):
'Should be a json mapping of gradle build targets to pre-built '
'artifacts (e.g. jar files) expansion endpoints (e.g. host:port).'))
+ parser.add_argument(
+ '--use_transform_service',
+ default=False,
+ action='store_true',
+ help='Use the Docker-composed-based transform service when expanding '
+ 'cross-language transforms.')
+
def additional_option_ptransform_fn():
beam.transforms.ptransform.ptransform_fn_typehints_enabled = True
diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py
index 104fc4c7014..5182293ed59 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -23,7 +23,9 @@ import copy
import functools
import glob
import logging
+import subprocess
import threading
+import uuid
from collections import OrderedDict
from collections import namedtuple
from typing import Dict
@@ -32,6 +34,7 @@ import grpc
from apache_beam import pvalue
from apache_beam.coders import RowCoder
+from apache_beam.options.pipeline_options import CrossLanguageOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_expansion_api_pb2
@@ -51,6 +54,7 @@ from apache_beam.typehints.trivial_inference import instance_to_type
from apache_beam.typehints.typehints import Union
from apache_beam.typehints.typehints import UnionConstraint
from apache_beam.utils import subprocess_server
+from apache_beam.utils import transform_service_launcher
DEFAULT_EXPANSION_SERVICE = 'localhost:8097'
@@ -668,7 +672,10 @@ class ExternalTransform(ptransform.PTransform):
transform=transform_proto,
output_coder_requests=output_coders)
- with ExternalTransform.service(self._expansion_service) as service:
+ expansion_service = _maybe_use_transform_service(
+ self._expansion_service, pipeline.options)
+
+ with ExternalTransform.service(expansion_service) as service:
response = service.Expand(request)
if response.error:
raise RuntimeError(response.error)
@@ -973,6 +980,71 @@ class BeamJarExpansionService(JavaJarExpansionService):
path_to_jar, extra_args, classpath=classpath, append_args=append_args)
+def _maybe_use_transform_service(provided_service=None, options=None):
+ # For anything other than 'JavaJarExpansionService' we just use the
+ # provided service. For example, string address of an already available
+ # service.
+ if not isinstance(provided_service, JavaJarExpansionService):
+ return provided_service
+
+ def is_java_available():
+ cmd = ['java', '--version']
+
+ try:
+ subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ except: # pylint: disable=bare-except
+ return False
+
+ return True
+
+ def is_docker_available():
+ cmd = ['docker', '--version']
+
+ try:
+ subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ except: # pylint: disable=bare-except
+ return False
+
+ return True
+
+ # We try java and docker based expansion services in that order.
+
+ java_available = is_java_available()
+ docker_available = is_docker_available()
+
+ use_transform_service = options.view_as(
+ CrossLanguageOptions).use_transform_service
+
+ if (java_available and provided_service and not use_transform_service):
+ return provided_service
+ elif docker_available:
+ if use_transform_service:
+ error_append = 'it was explicitly requested'
+ elif not java_available:
+ error_append = 'the Java executable is not available in the system'
+ else:
+ error_append = 'a Java expansion service was not provided.'
+
+ project_name = str(uuid.uuid4())
+ port = subprocess_server.pick_port(None)[0]
+
+ logging.info(
+ 'Trying to expand the external transform using the Docker Compose '
+ 'based transform service since %s. Transform service will be under '
+ 'Docker Compose project name %s and will be made available at port %r.'
+ % (error_append, project_name, str(port)))
+
+ from apache_beam import version as beam_version
+ beam_version = beam_version.__version__
+
+ return transform_service_launcher.TransformServiceLauncher(
+ project_name, port, beam_version)
+ else:
+ raise ValueError(
+ 'Cannot start an expansion service since neither Java nor '
+ 'Docker executables are available in the system.')
+
+
def memoize(func):
cache = {}
diff --git a/sdks/python/apache_beam/utils/transform_service_launcher.py b/sdks/python/apache_beam/utils/transform_service_launcher.py
new file mode 100644
index 00000000000..84f081e64ad
--- /dev/null
+++ b/sdks/python/apache_beam/utils/transform_service_launcher.py
@@ -0,0 +1,261 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import logging
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import threading
+import time
+import zipfile
+from pathlib import Path
+
+import grpc
+
+from apache_beam.utils import subprocess_server
+
+_LOGGER = logging.getLogger(__name__)
+
+_COMMAND_POSSIBLE_VALUES = ['up', 'down', 'ps']
+
+_EXPANSION_SERVICE_LAUNCHER_JAR = ':sdks:java:transform-service:launcher:build'
+
+
+class TransformServiceLauncher(object):
+ _DEFAULT_PROJECT_NAME = 'apache.beam.transform.service'
+ _DEFAULT_START_WAIT_TIMEOUT = 25000
+
+ _launchers = {} # type: ignore
+
+ # Maintaining a static list of launchers to prevent temporary resources
+ # from being created unnecessarily.
+ def __new__(cls, project_name, port, beam_version=None):
+ if project_name not in TransformServiceLauncher._launchers:
+ TransformServiceLauncher._launchers[project_name] = super(
+ TransformServiceLauncher, cls).__new__(cls)
+ return TransformServiceLauncher._launchers[project_name]
+
+ def __init__(self, project_name, port, beam_version=None):
+ logging.info('Initializing the Beam Transform Service %s.' % project_name)
+
+ self._project_name = project_name
+ self._port = port
+ self._address = 'localhost:' + str(self._port)
+
+ self._launcher_lock = threading.RLock()
+
+ self.docker_compose_command_prefix = [
+ 'docker-compose', '-p', project_name, '-f', 'TODO path'
+ ]
+
+ # Setting up Docker Compose configuration.
+
+ # We use Docker Compose project name as the name of the temporary directory
+ # to isolate different transform service instances that may be running in
+ # the same machine.
+
+ temp_dir = os.path.join(tempfile.gettempdir(), project_name)
+ if not os.path.exists(temp_dir):
+ os.mkdir(temp_dir)
+
+ # Get the jar with configs
+ path_to_local_jar = subprocess_server.JavaJarServer.local_jar(
+ subprocess_server.JavaJarServer.path_to_beam_jar(
+ _EXPANSION_SERVICE_LAUNCHER_JAR))
+
+ with zipfile.ZipFile(path_to_local_jar) as launcher_jar:
+ launcher_jar.extract('docker-compose.yml', path=temp_dir)
+ launcher_jar.extract('.env', path=temp_dir)
+
+ compose_file = os.path.join(temp_dir, 'docker-compose.yml')
+
+ credentials_dir = os.path.join(temp_dir, 'credentials_dir')
+ if not os.path.exists(credentials_dir):
+ os.mkdir(credentials_dir)
+
+ logging.info('Copying the Google Application Default Credentials file.')
+
+ is_windows = 'windows' in os.name.lower()
+ application_default_path_suffix = (
+ '\\gcloud\\application_default_credentials.json' if is_windows else
+ '.config/gcloud/application_default_credentials.json')
+ application_default_path_file = os.path.join(
+ str(Path.home()), application_default_path_suffix)
+ application_default_path_copied = os.path.join(
+ credentials_dir, 'application_default_credentials.json')
+
+ if os.path.exists(application_default_path_file):
+ shutil.copyfile(
+ application_default_path_file, application_default_path_copied)
+ else:
+ logging.info(
+ 'GCP credentials will not be available for the transform service '
+ 'since could not find the Google Cloud application default '
+ 'credentials file at the expected location %s.' %
+ application_default_path_file)
+
+ self._environmental_variables = {}
+ self._environmental_variables['CREDENTIALS_VOLUME'] = credentials_dir
+ self._environmental_variables['TRANSFORM_SERVICE_PORT'] = str(port)
+ self._environmental_variables['BEAM_VERSION'] = beam_version
+
+ self._docker_compose_start_command_prefix = []
+ self._docker_compose_start_command_prefix.append('docker-compose')
+ self._docker_compose_start_command_prefix.append('-p')
+ self._docker_compose_start_command_prefix.append(project_name)
+ self._docker_compose_start_command_prefix.append('-f')
+ self._docker_compose_start_command_prefix.append(compose_file)
+
+ def _get_channel(self):
+ channel_options = [("grpc.max_receive_message_length", -1),
+ ("grpc.max_send_message_length", -1)]
+ if hasattr(grpc, 'local_channel_credentials'):
+ # TODO: update this to support secure non-local channels.
+ return grpc.secure_channel(
+ self._address,
+ grpc.local_channel_credentials(),
+ options=channel_options)
+ else:
+ return grpc.insecure_channel(self._address, options=channel_options)
+
+ def __enter__(self):
+ self.start()
+ self.wait_till_up(-1)
+
+ self._channel = self._get_channel()
+
+ from apache_beam import external
+ return external.ExpansionAndArtifactRetrievalStub(self._channel.__enter__())
+
+ def __exit__(self, *args):
+ self.shutdown()
+ self._channel.__exit__(*args)
+
+ def _run_docker_compose_command(self, command, output_override=None):
+ cmd = []
+ cmd.extend(self._docker_compose_start_command_prefix)
+ cmd.extend(command)
+
+ myenv = os.environ.copy()
+ myenv.update(self._environmental_variables)
+
+ process = subprocess.Popen(
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=myenv)
+ std_out, _ = process.communicate()
+
+ if output_override:
+ output_override.write(std_out)
+ else:
+ print(std_out.decode(errors='backslashreplace'))
+
+ def start(self):
+ with self._launcher_lock:
+ self._run_docker_compose_command(['up', '-d'])
+
+ def shutdown(self):
+ with self._launcher_lock:
+ self._run_docker_compose_command(['down'])
+
+ def status(self):
+ with self._launcher_lock:
+ self._run_docker_compose_command(['ps'])
+
+ def wait_till_up(self, timeout_ms):
+ channel = self._get_channel()
+
+ timeout_ms = (
+ TransformServiceLauncher._DEFAULT_START_WAIT_TIMEOUT
+ if timeout_ms <= 0 else timeout_ms)
+
+ # Waiting till the service is up.
+ channel_ready = grpc.channel_ready_future(channel)
+ wait_secs = .1
+ start_time = time.time()
+ while True:
+ if (time.time() - start_time) * 1000 > timeout_ms > 0:
+ raise ValueError(
+ 'Transform service did not start in %s seconds.' %
+ (timeout_ms / 1000))
+ try:
+ channel_ready.result(timeout=wait_secs)
+ break
+ except (grpc.FutureTimeoutError, grpc.RpcError):
+ wait_secs *= 1.2
+ logging.log(
+ logging.WARNING if wait_secs > 1 else logging.DEBUG,
+ 'Waiting for the transform service to be ready at %s.',
+ self._address)
+
+ logging.info('Transform service ' + self._project_name + ' started.')
+
+ def _get_status(self):
+ tmp = tempfile.NamedTemporaryFile(delete=False)
+ self._run_docker_compose_command(['ps'], tmp)
+ tmp.close()
+ return tmp.name
+
+
+def main(argv):
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--project_name', help='Docker Compose project name.')
+ parser.add_argument(
+ '--command',
+ required=True,
+ choices=_COMMAND_POSSIBLE_VALUES,
+ help='Command to run. Possible values are ' +
+ ', '.join(_COMMAND_POSSIBLE_VALUES))
+ parser.add_argument(
+ '--port',
+ type=int,
+ default=-1,
+ help='External visible port of the transform service.')
+ parser.add_argument(
+ '--beam_version',
+ required=True,
+ help='Beam version of the expansion service containers to be used.')
+
+ known_args, _ = parser.parse_known_args(argv)
+
+ project_name = (
+ TransformServiceLauncher._DEFAULT_PROJECT_NAME
+ if known_args.project_name is None else known_args.project_name)
+ logging.info(
+ 'Starting the Beam Transform Service at %s.' % (
+ 'the default port' if known_args.port < 0 else
+ (' port ' + str(known_args.port))))
+ launcher = TransformServiceLauncher(
+ project_name, known_args.port, known_args.beam_version)
+
+ if known_args.command == 'up':
+ launcher.start()
+ launcher.wait_till_up(-1)
+ elif known_args.command == 'down':
+ launcher.shutdown()
+ elif known_args.command == 'ps':
+ launcher.status()
+ else:
+ raise ValueError(
+ 'Unknown command %s possible values are %s' %
+ (known_args.command, ', '.join(_COMMAND_POSSIBLE_VALUES)))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ main(sys.argv)