You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Brian Hulette (Jira)" <ji...@apache.org> on 2020/10/19 22:53:00 UTC
[jira] [Commented] (BEAM-9527)
apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerSplitTest.test_split_crazy_sdf
is flaky
[ https://issues.apache.org/jira/browse/BEAM-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217148#comment-17217148 ]
Brian Hulette commented on BEAM-9527:
-------------------------------------
Based on the [history|https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/testReport/junit/apache_beam.runners.portability.fn_api_runner.fn_runner_test/FnApiRunnerSplitTest/test_split_crazy_sdf/history] this doesn't seem to be flaky any more, please re-open if it's still occurring.
> apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerSplitTest.test_split_crazy_sdf is flaky
> -----------------------------------------------------------------------------------------------------
>
> Key: BEAM-9527
> URL: https://issues.apache.org/jira/browse/BEAM-9527
> Project: Beam
> Issue Type: Bug
> Components: test-failures
> Reporter: Valentyn Tymofieiev
> Priority: P1
> Labels: beam-fixit, flake
>
> {noformat}
> self = <apache_beam.runners.portability.fn_api_runner.BundleManager object at 0x7fe494edb450>
> split_manager = <function split_manager at 0x7fe4c2ff0c80>
> inputs = {'ref_PCollection_PCollection_3_split/Read': ['\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x08V\xff\x80\x02capache_beam....\nOffsetRange\nq\x01)\x81q\x02}q\x03(U\x04stopq\x04K\x05U\x05startq\x05K\x00ub.\x01\x00@\x14\x00\x00\x00\x00\x00\x00']}
> process_bundle_id = 'bundle_2575'
> def _generate_splits_for_testing(self,
> split_manager,
> inputs, # type: Mapping[str, PartitionableBuffer]
> process_bundle_id):
> # type: (...) -> List[beam_fn_api_pb2.ProcessBundleSplitResponse]
> split_results = [] # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
> read_transform_id, buffer_data = only_element(inputs.items())
> byte_stream = b''.join(buffer_data)
> num_elements = len(
> list(
> self._get_input_coder_impl(read_transform_id).decode_all(
> byte_stream)))
>
> # Start the split manager in case it wants to set any breakpoints.
> split_manager_generator = split_manager(num_elements)
> try:
> split_fraction = next(split_manager_generator)
> done = False
> except StopIteration:
> done = True
>
> # Send all the data.
> self._send_input_to_worker(
> process_bundle_id, read_transform_id, [byte_stream])
>
> assert self._worker_handler is not None
>
> # Execute the requested splits.
> while not done:
> if split_fraction is None:
> split_result = None
> else:
> split_request = beam_fn_api_pb2.InstructionRequest(
> process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitRequest(
> instruction_id=process_bundle_id,
> desired_splits={
> read_transform_id: beam_fn_api_pb2.
> ProcessBundleSplitRequest.DesiredSplit(
> fraction_of_remainder=split_fraction,
> estimated_input_elements=num_elements)
> }))
> split_response = self._worker_handler.control_conn.push(
> split_request).get() # type: beam_fn_api_pb2.InstructionResponse
> for t in (0.05, 0.1, 0.2):
> waiting = ('Instruction not running', 'not yet scheduled')
> if any(msg in split_response.error for msg in waiting):
> time.sleep(t)
> split_response = self._worker_handler.control_conn.push(
> split_request).get()
> if 'Unknown process bundle' in split_response.error:
> # It may have finished too fast.
> split_result = None
> elif split_response.error:
> > raise RuntimeError(split_response.error)
> E RuntimeError: Traceback (most recent call last):
> E File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 190, in _execute
> E response = task()
> E File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 229, in <lambda>
> E lambda: self.create_worker().do_instruction(request), request)
> E File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 416, in do_instruction
> E getattr(request, request_type), request.instruction_id)
> E File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 479, in process_bundle_split
> E process_bundle_split=processor.try_split(request))
> E File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 882, in try_split
> E desired_split.estimated_input_elements)
> E File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 250, in try_split
> E keep_of_element_remainder
> E File "apache_beam/runners/worker/operations.py", line 202, in apache_beam.runners.worker.operations.SingletonConsumerSet.try_split
> E return self.consumer.try_split(fraction_of_remainder)
> E File "apache_beam/runners/worker/operations.py", line 804, in apache_beam.runners.worker.operations.SdfProcessSizedElements.try_split
> E split = self.dofn_runner.try_split(fraction_of_remainder)
> E File "apache_beam/runners/common.py", line 973, in apache_beam.runners.common.DoFnRunner.try_split
> E return self.do_fn_invoker.try_split(fraction)
> E File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.try_split
> E self.threadsafe_watermark_estimator.current_watermark())
> E AttributeError: 'NoneType' object has no attribute 'current_watermark'
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)