You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/16 22:38:42 UTC

[GitHub] [beam] robertwb commented on a change in pull request #13060: [BEAM-7746] Fix typing in runners

robertwb commented on a change in pull request #13060:
URL: https://github.com/apache/beam/pull/13060#discussion_r506748112



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -539,6 +548,7 @@ def merge_results(last_result):
       else:
         data_input = deferred_inputs
         input_timers = fired_timers
+        # FIXME: this seems unused, and produces an attr-defined error
         bundle_manager._registered = True

Review comment:
       Yes, this should not be needed anymore. 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
##########
@@ -1062,34 +1104,41 @@ def close(self):
 
 
 class ControlFuture(object):
-  def __init__(self, instruction_id, response=None):
+  def __init__(self,
+               instruction_id,  # type: str
+               response=None  # type: Optional[beam_fn_api_pb2.InstructionResponse]
+              ):
+    # type: (...) -> None
     self.instruction_id = instruction_id
-    if response:
-      self._response = response
-    else:
-      self._response = None
+    self._response = response
+    if response is None:
       self._condition = threading.Condition()
-    self._exception = None
+    self._exception = None  # type: Optional[Exception]
 
   def is_done(self):
+    # type: () -> bool
     return self._response is not None
 
   def set(self, response):
+    # type: (beam_fn_api_pb2.InstructionResponse) -> None
     with self._condition:
       self._response = response
       self._condition.notify_all()
 
   def get(self, timeout=None):
+    # type: (Optional[float]) -> beam_fn_api_pb2.InstructionResponse
     if not self._response and not self._exception:
       with self._condition:
         if not self._response and not self._exception:
           self._condition.wait(timeout)
     if self._exception:
       raise self._exception
     else:
+      assert self._response is not None

Review comment:
       Correct, either an exception will be thrown, a timeout will be thrown, or the response is non-None.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -227,12 +251,24 @@ def __iter__(self):
     """
     return itertools.chain(*self.partition(1))
 
+  # these should never be accessed, but they allow this class to meet the

Review comment:
       How about making them raise errors rather than be no-ops? 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -307,6 +308,9 @@ def executable_stage_transform(
           for side in side_inputs
       },
                           main_input=main_input_id)
+      # at this point we should have resolved an environment, as the key of
+      # components.environments cannot be None.
+      assert self.environment is not None

Review comment:
       Yes, that looks safe. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org