You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/09/08 20:55:45 UTC

[beam] branch release-2.42.0 updated: Use cloudpickle for Java Python transforms. (#23093)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch release-2.42.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.42.0 by this push:
     new 3d64d63ea86 Use cloudpickle for Java Python transforms. (#23093)
3d64d63ea86 is described below

commit 3d64d63ea863f810f0e31ea80e36f48ca167f4e4
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Thu Sep 8 13:55:37 2022 -0700

    Use cloudpickle for Java Python transforms. (#23093)
    
    This works around bugs in dill regarding functions defined with
    eval and exec used for the PythonCallable types here.
    
    Also fix flag that was placed on the wrong binary.
---
 .../apache/beam/sdk/extensions/python/PythonExternalTransform.java   | 3 ++-
 .../python/apache_beam/runners/portability/expansion_service_main.py | 5 +++++
 .../python/apache_beam/runners/portability/local_job_service_main.py | 5 -----
 3 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
index 9e2aa3442f3..f9aea4dc47c 100644
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
+++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
@@ -449,7 +449,8 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
       } else {
         int port = PythonService.findAvailablePort();
         ImmutableList.Builder<String> args = ImmutableList.builder();
-        args.add("--port", "" + port, "--fully_qualified_name_glob", "*");
+        args.add(
+            "--port=" + port, "--fully_qualified_name_glob=*", "--default_pickler=cloudpickle");
         if (!extraPackages.isEmpty()) {
           File requirementsFile = File.createTempFile("requirements", ".txt");
           requirementsFile.deleteOnExit();
diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_main.py b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
index c9d4f45c823..ffa8df8b4d4 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
@@ -23,6 +23,7 @@ import sys
 
 import grpc
 
+from apache_beam.internal import pickler
 from apache_beam.pipeline import PipelineOptions
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
 from apache_beam.portability.api import beam_expansion_api_pb2_grpc
@@ -39,10 +40,14 @@ def main(argv):
   parser.add_argument(
       '-p', '--port', type=int, help='port on which to serve the job api')
   parser.add_argument('--fully_qualified_name_glob', default=None)
+  parser.add_argument('--default_pickler')
   known_args, pipeline_args = parser.parse_known_args(argv)
   pipeline_options = PipelineOptions(
       pipeline_args + ["--experiments=beam_fn_api", "--sdk_location=container"])
 
+  if known_args.default_pickler:
+    pickler.set_library(known_args.default_pickler)
+
   with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
       known_args.fully_qualified_name_glob):
 
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service_main.py b/sdks/python/apache_beam/runners/portability/local_job_service_main.py
index edf2d831439..6d9d32f5f23 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service_main.py
@@ -37,7 +37,6 @@ import subprocess
 import sys
 import time
 
-from apache_beam.internal import pickler
 from apache_beam.runners.portability import local_job_service
 
 _LOGGER = logging.getLogger(__name__)
@@ -55,7 +54,6 @@ def run(argv):
       default=0,
       help='port on which to serve the job api')
   parser.add_argument('--staging_dir')
-  parser.add_argument('--default_pickler')
   parser.add_argument(
       '--pid_file', help='File in which to store the process id of the server.')
   parser.add_argument(
@@ -131,9 +129,6 @@ def run(argv):
     print('Server started at port', port)
     return
 
-  if options.default_pickler:
-    pickler.set_library(options.default_pickler)
-
   if options.pid_file:
     print('Writing process id to', options.pid_file)
     os.makedirs(pathlib.PurePath(options.pid_file).parent, exist_ok=True)