You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/09/08 20:55:45 UTC
[beam] branch release-2.42.0 updated: Use cloudpickle for Java Python transforms. (#23093)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch release-2.42.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.42.0 by this push:
new 3d64d63ea86 Use cloudpickle for Java Python transforms. (#23093)
3d64d63ea86 is described below
commit 3d64d63ea863f810f0e31ea80e36f48ca167f4e4
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Thu Sep 8 13:55:37 2022 -0700
Use cloudpickle for Java Python transforms. (#23093)
This works around bugs in dill regarding functions defined with
eval and exec used for the PythonCallable types here.
Also fix flag that was placed on the wrong binary.
---
.../apache/beam/sdk/extensions/python/PythonExternalTransform.java | 3 ++-
.../python/apache_beam/runners/portability/expansion_service_main.py | 5 +++++
.../python/apache_beam/runners/portability/local_job_service_main.py | 5 -----
3 files changed, 7 insertions(+), 6 deletions(-)
diff --git a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
index 9e2aa3442f3..f9aea4dc47c 100644
--- a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
+++ b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
@@ -449,7 +449,8 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
} else {
int port = PythonService.findAvailablePort();
ImmutableList.Builder<String> args = ImmutableList.builder();
- args.add("--port", "" + port, "--fully_qualified_name_glob", "*");
+ args.add(
+ "--port=" + port, "--fully_qualified_name_glob=*", "--default_pickler=cloudpickle");
if (!extraPackages.isEmpty()) {
File requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_main.py b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
index c9d4f45c823..ffa8df8b4d4 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py
@@ -23,6 +23,7 @@ import sys
import grpc
+from apache_beam.internal import pickler
from apache_beam.pipeline import PipelineOptions
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
@@ -39,10 +40,14 @@ def main(argv):
parser.add_argument(
'-p', '--port', type=int, help='port on which to serve the job api')
parser.add_argument('--fully_qualified_name_glob', default=None)
+ parser.add_argument('--default_pickler')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(
pipeline_args + ["--experiments=beam_fn_api", "--sdk_location=container"])
+ if known_args.default_pickler:
+ pickler.set_library(known_args.default_pickler)
+
with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
known_args.fully_qualified_name_glob):
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service_main.py b/sdks/python/apache_beam/runners/portability/local_job_service_main.py
index edf2d831439..6d9d32f5f23 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service_main.py
@@ -37,7 +37,6 @@ import subprocess
import sys
import time
-from apache_beam.internal import pickler
from apache_beam.runners.portability import local_job_service
_LOGGER = logging.getLogger(__name__)
@@ -55,7 +54,6 @@ def run(argv):
default=0,
help='port on which to serve the job api')
parser.add_argument('--staging_dir')
- parser.add_argument('--default_pickler')
parser.add_argument(
'--pid_file', help='File in which to store the process id of the server.')
parser.add_argument(
@@ -131,9 +129,6 @@ def run(argv):
print('Server started at port', port)
return
- if options.default_pickler:
- pickler.set_library(options.default_pickler)
-
if options.pid_file:
print('Writing process id to', options.pid_file)
os.makedirs(pathlib.PurePath(options.pid_file).parent, exist_ok=True)