You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/30 23:45:32 UTC

[GitHub] [beam] robertwb commented on a change in pull request #11557: [BEAM-9845] Stage artifacts over expansion service.

robertwb commented on a change in pull request #11557:
URL: https://github.com/apache/beam/pull/11557#discussion_r418348695



##########
File path: sdks/python/apache_beam/utils/subprocess_server.py
##########
@@ -98,7 +98,9 @@ def log_stdout():
         t.daemon = True
         t.start()
         wait_secs = .1
-        channel = grpc.insecure_channel(endpoint)
+        channel_options = [("grpc.max_receive_message_length", -1),
+                           ("grpc.max_send_message_length", -1)]

Review comment:
       Yep, the default block size (4MB) produced messages just over the default grpc limits (4MB). I figured setting these here was safer, as the message size is chosen by the other side. We have similar settings elsewhere (e.g. on the data channel). 

##########
File path: sdks/python/apache_beam/pipeline.py
##########
@@ -211,6 +211,8 @@ def __init__(self, runner=None, options=None, argv=None):
         experiments.append('beam_fn_api')
         self._options.view_as(DebugOptions).experiments = experiments
 
+    self.local_tempdir = tempfile.mkdtemp(prefix='beam-pipeline-temp')

Review comment:
       Did you have something in mind? I don't think there's a good name here we can use. The system will of course ensure this directory is unique per pipeline. 

##########
File path: sdks/python/apache_beam/transforms/external.py
##########
@@ -317,13 +317,17 @@ def expand(self, pvalueish):
         transform=transform_proto)
 
     with self._service() as service:
+      print(type(service))

Review comment:
       Oops. Removed.

##########
File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DefaultArtifactResolver.java
##########
@@ -62,6 +63,24 @@ public void register(ResolutionFn fn) {
     fns.add(fn);
   }
 
+  @Override
+  public List<RunnerApi.ArtifactInformation> resolveArtifacts(
+      List<RunnerApi.ArtifactInformation> artifacts) {
+    for (ResolutionFn fn : Lists.reverse(fns)) {

Review comment:
       Yes, this is documented in the class's docstring. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org