You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/05/15 17:58:02 UTC

[jira] [Updated] (BEAM-9527) apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerSplitTest.test_split_crazy_sdf is flaky

     [ https://issues.apache.org/jira/browse/BEAM-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-9527:
----------------------------------
    Resolution: Fixed
        Status: Resolved  (was: Resolved)

Hello! Due to a bug in our Jira configuration, this issue had status:Resolved but resolution:Unresolved.

I am bulk editing these issues to have resolution:Fixed

If a different resolution is appropriate, please change it. To do this, click the "Resolve" button (you can do this even for closed issues) and set the Resolution field to the right value.

> apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerSplitTest.test_split_crazy_sdf is flaky
> -----------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-9527
>                 URL: https://issues.apache.org/jira/browse/BEAM-9527
>             Project: Beam
>          Issue Type: Bug
>          Components: test-failures
>            Reporter: Valentyn Tymofieiev
>            Priority: P1
>              Labels: beam-fixit, flake
>             Fix For: Not applicable
>
>
> {noformat}
> self = <apache_beam.runners.portability.fn_api_runner.BundleManager object at 0x7fe494edb450>
> split_manager = <function split_manager at 0x7fe4c2ff0c80>
> inputs = {'ref_PCollection_PCollection_3_split/Read': ['\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x08V\xff\x80\x02capache_beam....\nOffsetRange\nq\x01)\x81q\x02}q\x03(U\x04stopq\x04K\x05U\x05startq\x05K\x00ub.\x01\x00@\x14\x00\x00\x00\x00\x00\x00']}
> process_bundle_id = 'bundle_2575'
>     def _generate_splits_for_testing(self,
>                                      split_manager,
>                                      inputs,  # type: Mapping[str, PartitionableBuffer]
>                                      process_bundle_id):
>       # type: (...) -> List[beam_fn_api_pb2.ProcessBundleSplitResponse]
>       split_results = []  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
>       read_transform_id, buffer_data = only_element(inputs.items())
>       byte_stream = b''.join(buffer_data)
>       num_elements = len(
>           list(
>               self._get_input_coder_impl(read_transform_id).decode_all(
>                   byte_stream)))
>     
>       # Start the split manager in case it wants to set any breakpoints.
>       split_manager_generator = split_manager(num_elements)
>       try:
>         split_fraction = next(split_manager_generator)
>         done = False
>       except StopIteration:
>         done = True
>     
>       # Send all the data.
>       self._send_input_to_worker(
>           process_bundle_id, read_transform_id, [byte_stream])
>     
>       assert self._worker_handler is not None
>     
>       # Execute the requested splits.
>       while not done:
>         if split_fraction is None:
>           split_result = None
>         else:
>           split_request = beam_fn_api_pb2.InstructionRequest(
>               process_bundle_split=beam_fn_api_pb2.ProcessBundleSplitRequest(
>                   instruction_id=process_bundle_id,
>                   desired_splits={
>                       read_transform_id: beam_fn_api_pb2.
>                       ProcessBundleSplitRequest.DesiredSplit(
>                           fraction_of_remainder=split_fraction,
>                           estimated_input_elements=num_elements)
>                   }))
>           split_response = self._worker_handler.control_conn.push(
>               split_request).get()  # type: beam_fn_api_pb2.InstructionResponse
>           for t in (0.05, 0.1, 0.2):
>             waiting = ('Instruction not running', 'not yet scheduled')
>             if any(msg in split_response.error for msg in waiting):
>               time.sleep(t)
>               split_response = self._worker_handler.control_conn.push(
>                   split_request).get()
>           if 'Unknown process bundle' in split_response.error:
>             # It may have finished too fast.
>             split_result = None
>           elif split_response.error:
> >           raise RuntimeError(split_response.error)
> E           RuntimeError: Traceback (most recent call last):
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 190, in _execute
> E               response = task()
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 229, in <lambda>
> E               lambda: self.create_worker().do_instruction(request), request)
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 416, in do_instruction
> E               getattr(request, request_type), request.instruction_id)
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 479, in process_bundle_split
> E               process_bundle_split=processor.try_split(request))
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 882, in try_split
> E               desired_split.estimated_input_elements)
> E             File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Phrase/src/sdks/python/test-suites/tox/py2/build/srcs/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 250, in try_split
> E               keep_of_element_remainder
> E             File "apache_beam/runners/worker/operations.py", line 202, in apache_beam.runners.worker.operations.SingletonConsumerSet.try_split
> E               return self.consumer.try_split(fraction_of_remainder)
> E             File "apache_beam/runners/worker/operations.py", line 804, in apache_beam.runners.worker.operations.SdfProcessSizedElements.try_split
> E               split = self.dofn_runner.try_split(fraction_of_remainder)
> E             File "apache_beam/runners/common.py", line 973, in apache_beam.runners.common.DoFnRunner.try_split
> E               return self.do_fn_invoker.try_split(fraction)
> E             File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.try_split
> E               self.threadsafe_watermark_estimator.current_watermark())
> E           AttributeError: 'NoneType' object has no attribute 'current_watermark'
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)