You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/09/06 17:07:01 UTC

[jira] [Commented] (BEAM-9527) apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerSplitTest.test_split_crazy_sdf is flaky

    [ https://issues.apache.org/jira/browse/BEAM-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17191337#comment-17191337 ] 

Beam JIRA Bot commented on BEAM-9527:
-------------------------------------

This issue is assigned but has not received an update in 30 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned.

> apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerSplitTest.test_split_crazy_sdf is flaky
> -----------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-9527
>                 URL: https://issues.apache.org/jira/browse/BEAM-9527
>             Project: Beam
>          Issue Type: Bug
>          Components: test-failures
>            Reporter: Valentyn Tymofieiev
>            Assignee: Boyuan Zhang
>            Priority: P1
>              Labels: beam-fixit, flake, stale-assigned
>
> {noformat}
> self = <apache_beam.runners.portability.fn_api_runner.BundleManager object at 0x7fe494edb450>
> split_manager = <function split_manager at 0x7fe4c2ff0c80>
> inputs = {'ref_PCollection_PCollection_3_split/Read': ['\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x08V\xff\x80\x02capache_beam....\nOffsetRange\nq\x01)\x81q\x02}q\x03(U\x04stopq\x04K\x05U\x05startq\x05K\x00ub.\x01\x00@\x14\x00\x00\x00\x00\x00\x00']}
> process_bundle_id = 'bundle_2575'
>     def _generate_splits_for_testing(self,
>                                      split_manager,
>                                      inputs,  # type: Mapping[str, PartitionableBuffer]
>                                      process_bundle_id):
>       # type: (...) -> List[beam_fn_api_pb2.ProcessBundleSplitResponse]
>       split_results = []  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
>       read_transform_id, buffer_data = only_element(inputs.items())
>       byte_stream = b''.join(buffer_data)
>       num_elements = len(
>           list(
>               self._get_input_coder_impl(read_transform_id).decode_all(
>                   byte_stream)))
>     
>       # Start the split manager in case it wants to set any breakpoints.
>       split_manager_generator = split_manager(num_elements)
>       try:
>         split_fraction = next(split_manager_generator)
>         done = False
>       except StopIteration:
>         done = True
>     
>       # Send all the data.
>       self._send_input_to_worker(
>           process_bundle_id, read_transform_id, [byte_stream])
>     
>       assert self._worker_handler is not None
>     
>       # Execute the requested splits.
>       while not done:
>         if split_fraction is None:
>           split_result = None
>         else:
>           split_request = beam_fn_api_pb2.InstructionRequest(
>               process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitRequest(
>                   instruction_id=process_bundle_id,
>                   desired_splits={
>                       read_transform_id: beam_fn_api_pb2.
>                       ProcessBundleSplitRequest.DesiredSplit(
>                           fraction_of_remainder=split_fraction,
>                           estimated_input_elements=num_elements)
>                   }))
>           split_response = self._worker_handler.control_conn.push(
>               split_request).get()  # type: beam_fn_api_pb2.InstructionResponse
>           for t in (0.05, 0.1, 0.2):
>             waiting = ('Instruction not running', 'not yet scheduled')
>             if any(msg in split_response.error for msg in waiting):
>               time.sleep(t)
>               split_response = self._worker_handler.control_conn.push(
>                   split_request).get()
>           if 'Unknown process bundle' in split_response.error:
>             # It may have finished too fast.
>             split_result = None
>           elif split_response.error:
> >           raise RuntimeError(split_response.error)
> E           RuntimeError: Traceback (most recent call last):
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 190, in _execute
> E               response = task()
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 229, in <lambda>
> E               lambda: self.create_worker().do_instruction(request), request)
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 416, in do_instruction
> E               getattr(request, request_type), request.instruction_id)
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 479, in process_bundle_split
> E               process_bundle_split=processor.try_split(request))
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 882, in try_split
> E               desired_split.estimated_input_elements)
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 250, in try_split
> E               keep_of_element_remainder
> E             File "apache_beam/runners/worker/operations.py", line 202, in apache_beam.runners.worker.operations.SingletonConsumerSet.try_split
> E               return self.consumer.try_split(fraction_of_remainder)
> E             File "apache_beam/runners/worker/operations.py", line 804, in apache_beam.runners.worker.operations.SdfProcessSizedElements.try_split
> E               split = self.dofn_runner.try_split(fraction_of_remainder)
> E             File "apache_beam/runners/common.py", line 973, in apache_beam.runners.common.DoFnRunner.try_split
> E               return self.do_fn_invoker.try_split(fraction)
> E             File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.try_split
> E               self.threadsafe_watermark_estimator.current_watermark())
> E           AttributeError: 'NoneType' object has no attribute 'current_watermark'
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)