You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/09/09 18:23:05 UTC

[beam] branch release-2.42.0 updated: [release-2.42.0] Use existing pickle_library flag in expansion service. (#23118)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch release-2.42.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.42.0 by this push:
     new 31370815283 [release-2.42.0] Use existing pickle_library flag in expansion service. (#23118)
31370815283 is described below

commit 31370815283195adad3725ad062ef4e7e08494db
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Fri Sep 9 11:22:57 2022 -0700

    [release-2.42.0] Use existing pickle_library flag in expansion service. (#23118)
    
    * Use existing pickle_library flag in expansion service.
    
    * spotless
    
    Co-authored-by: Robert Bradshaw <ro...@gmail.com>
---
 .../beam/sdk/extensions/python/PythonExternalTransform.java      | 3 +--
 .../apache_beam/runners/portability/expansion_service_main.py    | 9 +++++----
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
index f9aea4dc47c..b3feb6df16d 100644
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
+++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
@@ -449,8 +449,7 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
       } else {
         int port = PythonService.findAvailablePort();
         ImmutableList.Builder<String> args = ImmutableList.builder();
-        args.add(
-            "--port=" + port, "--fully_qualified_name_glob=*", "--default_pickler=cloudpickle");
+        args.add("--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle");
         if (!extraPackages.isEmpty()) {
           File requirementsFile = File.createTempFile("requirements", ".txt");
           requirementsFile.deleteOnExit();
diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_main.py b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
index ffa8df8b4d4..9444f678441 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
@@ -24,7 +24,8 @@ import sys
 import grpc
 
 from apache_beam.internal import pickler
-from apache_beam.pipeline import PipelineOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
 from apache_beam.portability.api import beam_expansion_api_pb2_grpc
 from apache_beam.runners.portability import artifact_service
@@ -40,13 +41,13 @@ def main(argv):
   parser.add_argument(
       '-p', '--port', type=int, help='port on which to serve the job api')
   parser.add_argument('--fully_qualified_name_glob', default=None)
-  parser.add_argument('--default_pickler')
   known_args, pipeline_args = parser.parse_known_args(argv)
   pipeline_options = PipelineOptions(
       pipeline_args + ["--experiments=beam_fn_api", "--sdk_location=container"])
 
-  if known_args.default_pickler:
-    pickler.set_library(known_args.default_pickler)
+  # Set this before any pipeline construction occurs.
+  # See https://github.com/apache/beam/issues/21615
+  pickler.set_library(pipeline_options.view_as(SetupOptions).pickle_library)
 
   with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
       known_args.fully_qualified_name_glob):