You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Guenther Starnberger <gs...@yelp.com> on 2019/10/30 06:37:57 UTC

Re: [External] Re: Pipeline AttributeError on Python3

This looks exactly like a race condition that we've encountered on Python
3.7.1: There's a bug in some older 3.7.x releases that breaks the
thread-safety of the unpickler, as concurrent unpickle threads can access a
module before it has been fully imported. See
https://bugs.python.org/issue34572 for more information.

The traceback shows a Python 3.6 venv so this could be a different issue
(the unpickle bug was introduced in version 3.7). If it's the same bug then
upgrading to Python 3.7.3 or higher should fix that issue. One potential
workaround is to ensure that all of the modules get imported during the
initialization of the sdk_worker, as this bug only affects imports done by
the unpickler.

- Guenther


On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <va...@google.com>
wrote:

> +user@, bcc: dev@
> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
> this issue, although we saw instances of this bug in exactly opposite
> scenarios - when pipeline was defined *in one file*, but not in multiple
> files.
>
> Could you try replacing instances of super() in aggregation_transform.py
> as done in https://github.com/apache/beam/pull/9513 and see if this issue
> is still reproducible?
>
> If that doesn't work, I would try to get the dump of serialized_fn, and
> try to reproduce the issue in isolated environment, such as:
>
> form apache_beam.internal import pickler
> serialized_fn = "..content.."
> pickler.loads(serialized_fn)
>
> then I would try to trim the doFn in the example to a
> minimally-reproducible example. It could be another issue with dill
> dependency.
>
>
> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com> wrote:
>
>> Hi All,
>>
>> We have noticed a weird intermittent issue on Python3 but we don't run
>> into this issue on python2. Sometimes when we are trying to submit the
>> pipeline, we get AttributeError (Check the stack trace below).  we have
>> double-checked and we do find the attribute/methods are present in the
>> right module and in right place but somehow the pipeline still complains
>> about it. In some cases, we refer methods before their definition. We tried
>> to reorder the method definition but that didn't help at all.
>>
>> We don't see the same issue when the entire pipeline is defined in one
>> file. Also, note that this doesn't happen all the time when we submit the
>> pipeline, so I feel it is some kind of race condition. When we enable the
>> worker recycle logic it happens most of the time when sdk worker is
>> recycled.
>>
>> Some more information about the environment:
>> Python version: 3
>> Beam version: 2.16
>> Flink version: 1.8
>>
>> *Stack trace: *
>>
>>    - :
>>
>> TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>> at
>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>> at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>> at
>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>> ... 7 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: Error received from SDK harness for instruction
>> 6: Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 307, in get
>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>> IndexError: pop from empty list
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>> line 261, in loads
>>     return dill.loads(s)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 317, in loads
>>     return load(file, ignore)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 305, in load
>>     obj = pik.load()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 474, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>> 'pricingrealtime.aggregation.aggregation_transform' from
>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 165, in _execute
>>     response = task()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 198, in <lambda>
>>     self._execute(lambda: worker.do_instruction(work), work)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 351, in do_instruction
>>     request.instruction_id)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 371, in process_bundle
>>     instruction_id, request.process_bundle_descriptor_reference)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 313, in get
>>     self.data_channel_factory)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 576, in __init__
>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 620, in create_execution_tree
>>     descriptor.transforms, key=topological_height, reverse=True)])
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 619, in <listcomp>
>>     for transform_id in sorted(
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 603, in get_operation
>>     in descriptor.transforms[transform_id].outputs.items()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 602, in <dictcomp>
>>     for tag, pcoll_id
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 601, in <listcomp>
>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 603, in get_operation
>>     in descriptor.transforms[transform_id].outputs.items()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 602, in <dictcomp>
>>     for tag, pcoll_id
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 601, in <listcomp>
>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 606, in get_operation
>>     transform_id, transform_consumers)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 865, in create_operation
>>     return creator(self, transform_id, transform_proto, payload,
>> consumers)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1108, in create
>>     serialized_fn, parameter)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1146, in _create_pardo_operation
>>     dofn_data = pickler.loads(serialized_fn)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>> line 265, in loads
>>     return dill.loads(s)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 317, in loads
>>     return load(file, ignore)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 305, in load
>>     obj = pik.load()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 474, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>> 'pricingrealtime.aggregation.aggregation_transform' from
>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>
>>