You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/09/09 17:23:46 UTC
[beam] branch master updated: Use existing pickle_library flag in expansion service. (#23111)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 973475fde19 Use existing pickle_library flag in expansion service. (#23111)
973475fde19 is described below
commit 973475fde1942a7c6d87d1cd39a7529baa234b6c
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Fri Sep 9 10:23:34 2022 -0700
Use existing pickle_library flag in expansion service. (#23111)
---
.../beam/sdk/extensions/python/PythonExternalTransform.java | 3 +--
.../apache_beam/runners/portability/expansion_service_main.py | 9 +++++----
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
index f9aea4dc47c..b3feb6df16d 100644
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
+++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
@@ -449,8 +449,7 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
} else {
int port = PythonService.findAvailablePort();
ImmutableList.Builder<String> args = ImmutableList.builder();
- args.add(
- "--port=" + port, "--fully_qualified_name_glob=*", "--default_pickler=cloudpickle");
+ args.add("--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle");
if (!extraPackages.isEmpty()) {
File requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_main.py b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
index ffa8df8b4d4..9444f678441 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
@@ -24,7 +24,8 @@ import sys
import grpc
from apache_beam.internal import pickler
-from apache_beam.pipeline import PipelineOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.runners.portability import artifact_service
@@ -40,13 +41,13 @@ def main(argv):
parser.add_argument(
'-p', '--port', type=int, help='port on which to serve the job api')
parser.add_argument('--fully_qualified_name_glob', default=None)
- parser.add_argument('--default_pickler')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(
pipeline_args + ["--experiments=beam_fn_api", "--sdk_location=container"])
- if known_args.default_pickler:
- pickler.set_library(known_args.default_pickler)
+ # Set this before any pipeline construction occurs.
+ # See https://github.com/apache/beam/issues/21615
+ pickler.set_library(pipeline_options.view_as(SetupOptions).pickle_library)
with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
known_args.fully_qualified_name_glob):