You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/16 22:38:42 UTC
[GitHub] [beam] robertwb commented on a change in pull request #13060: [BEAM-7746] Fix typing in runners
robertwb commented on a change in pull request #13060:
URL: https://github.com/apache/beam/pull/13060#discussion_r506748112
##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -539,6 +548,7 @@ def merge_results(last_result):
else:
data_input = deferred_inputs
input_timers = fired_timers
+ # FIXME: this seems unused, and produces an attr-defined error
bundle_manager._registered = True
Review comment:
Yes, this should not be needed anymore.
##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
##########
@@ -1062,34 +1104,41 @@ def close(self):
class ControlFuture(object):
- def __init__(self, instruction_id, response=None):
+ def __init__(self,
+ instruction_id, # type: str
+ response=None # type: Optional[beam_fn_api_pb2.InstructionResponse]
+ ):
+ # type: (...) -> None
self.instruction_id = instruction_id
- if response:
- self._response = response
- else:
- self._response = None
+ self._response = response
+ if response is None:
self._condition = threading.Condition()
- self._exception = None
+ self._exception = None # type: Optional[Exception]
def is_done(self):
+ # type: () -> bool
return self._response is not None
def set(self, response):
+ # type: (beam_fn_api_pb2.InstructionResponse) -> None
with self._condition:
self._response = response
self._condition.notify_all()
def get(self, timeout=None):
+ # type: (Optional[float]) -> beam_fn_api_pb2.InstructionResponse
if not self._response and not self._exception:
with self._condition:
if not self._response and not self._exception:
self._condition.wait(timeout)
if self._exception:
raise self._exception
else:
+ assert self._response is not None
Review comment:
Correct, either an exception will be thrown, a timeout will be thrown, or the response is non-None.
##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -227,12 +251,24 @@ def __iter__(self):
"""
return itertools.chain(*self.partition(1))
+ # these should never be accessed, but they allow this class to meet the
Review comment:
How about making them raise errors rather than be no-ops?
##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -307,6 +308,9 @@ def executable_stage_transform(
for side in side_inputs
},
main_input=main_input_id)
+ # at this point we should have resolved an environment, as the key of
+ # components.environments cannot be None.
+ assert self.environment is not None
Review comment:
Yes, that looks safe.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org