You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/11 19:04:55 UTC

[GitHub] [beam] damccorm commented on a diff in pull request #17338: Improvements to dataflow job service for non-Python jobs.

damccorm commented on code in PR #17338:
URL: https://github.com/apache/beam/pull/17338#discussion_r847647711


##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -307,6 +305,14 @@ def __init__(
         container_image.capabilities.append(capability)
       pool.sdkHarnessContainerImages.append(container_image)
 
+    if not _use_fnapi(options):
+      pool.workerHarnessContainerImage = (
+          get_container_image_from_options(options))
+    elif len(pool.sdkHarnessContainerImages) == 1:

Review Comment:
   I'm not super familiar with this flow - could you explain what we're trying to accomplish here/why we don't need this set in the fn_api case?



##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -307,6 +305,14 @@ def __init__(
         container_image.capabilities.append(capability)
       pool.sdkHarnessContainerImages.append(container_image)
 
+    if not _use_fnapi(options):
+      pool.workerHarnessContainerImage = (
+          get_container_image_from_options(options))
+    elif len(pool.sdkHarnessContainerImages) == 1:

Review Comment:
   I'm not super familiar with this flow - could you explain what we're trying to accomplish here/why we don't need this set in the fn_api case?



##########
sdks/python/apache_beam/runners/portability/local_job_service.py:
##########
@@ -274,8 +274,9 @@ def _run_job(self):
         result = self._invoke_runner()
         _LOGGER.info(
             'Successfully completed job in %s seconds.', time.time() - start)
-        self.set_state(beam_job_api_pb2.JobState.DONE)
         self.result = result
+        result.wait_until_finish()

Review Comment:
   Should this come before the Logger call since presumably the job hasn't finished yet?



##########
sdks/python/apache_beam/runners/portability/local_job_service.py:
##########
@@ -274,8 +274,9 @@ def _run_job(self):
         result = self._invoke_runner()
         _LOGGER.info(
             'Successfully completed job in %s seconds.', time.time() - start)
-        self.set_state(beam_job_api_pb2.JobState.DONE)
         self.result = result
+        result.wait_until_finish()

Review Comment:
   Should this come before the Logger call since presumably the job hasn't finished yet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org