You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Valentyn Tymofieiev <va...@google.com> on 2019/10/28 23:59:49 UTC

Re: Pipeline AttributeError on Python3

+user@, bcc: dev@
https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to this
issue, although we saw instances of this bug in exactly opposite scenarios
- when pipeline was defined *in one file*, but not in multiple files.

Could you try replacing instances of super() in aggregation_transform.py
as done in https://github.com/apache/beam/pull/9513 and see if this issue
is still reproducible?

If that doesn't work, I would try to get the dump of serialized_fn, and try
to reproduce the issue in isolated environment, such as:

form apache_beam.internal import pickler
serialized_fn = "..content.."
pickler.loads(serialized_fn)

then I would try to trim the doFn in the example to a
minimally-reproducible example. It could be another issue with dill
dependency.


On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com> wrote:

> Hi All,
>
> We have noticed a weird intermittent issue on Python3 but we don't run
> into this issue on python2. Sometimes when we are trying to submit the
> pipeline, we get AttributeError (Check the stack trace below).  we have
> double-checked and we do find the attribute/methods are present in the
> right module and in right place but somehow the pipeline still complains
> about it. In some cases, we refer methods before their definition. We tried
> to reorder the method definition but that didn't help at all.
>
> We don't see the same issue when the entire pipeline is defined in one
> file. Also, note that this doesn't happen all the time when we submit the
> pipeline, so I feel it is some kind of race condition. When we enable the
> worker recycle logic it happens most of the time when sdk worker is
> recycled.
>
> Some more information about the environment:
> Python version: 3
> Beam version: 2.16
> Flink version: 1.8
>
> *Stack trace: *
>
>    - :
>
> TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
> at
> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
> at
> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
> ... 7 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 6: Traceback (most recent call last):
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 307, in get
>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
> IndexError: pop from empty list
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
> line 261, in loads
>     return dill.loads(s)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
> line 317, in loads
>     return load(file, ignore)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
> line 305, in load
>     obj = pik.load()
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
> line 474, in find_class
>     return StockUnpickler.find_class(self, module, name)
> *AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
> 'pricingrealtime.aggregation.aggregation_transform' from
> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 165, in _execute
>     response = task()
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 198, in <lambda>
>     self._execute(lambda: worker.do_instruction(work), work)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 351, in do_instruction
>     request.instruction_id)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 371, in process_bundle
>     instruction_id, request.process_bundle_descriptor_reference)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 313, in get
>     self.data_channel_factory)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 576, in __init__
>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 620, in create_execution_tree
>     descriptor.transforms, key=topological_height, reverse=True)])
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 619, in <listcomp>
>     for transform_id in sorted(
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 544, in wrapper
>     result = cache[args] = func(*args)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 603, in get_operation
>     in descriptor.transforms[transform_id].outputs.items()
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 602, in <dictcomp>
>     for tag, pcoll_id
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 601, in <listcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 544, in wrapper
>     result = cache[args] = func(*args)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 603, in get_operation
>     in descriptor.transforms[transform_id].outputs.items()
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 602, in <dictcomp>
>     for tag, pcoll_id
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 601, in <listcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 544, in wrapper
>     result = cache[args] = func(*args)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 606, in get_operation
>     transform_id, transform_consumers)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 865, in create_operation
>     return creator(self, transform_id, transform_proto, payload, consumers)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1108, in create
>     serialized_fn, parameter)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1146, in _create_pardo_operation
>     dofn_data = pickler.loads(serialized_fn)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
> line 265, in loads
>     return dill.loads(s)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
> line 317, in loads
>     return load(file, ignore)
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
> line 305, in load
>     obj = pik.load()
>   File
> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
> line 474, in find_class
>     return StockUnpickler.find_class(self, module, name)
> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
> 'pricingrealtime.aggregation.aggregation_transform' from
> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>
>

Re: Pipeline AttributeError on Python3

Posted by Valentyn Tymofieiev <va...@google.com>.
> Could you try replacing instances of super() in aggregation_transform.py
> as done in https://github.com/apache/beam/pull/9513 and see if this issue
> is still reproducible?


If you are using --save_main_session, instead of modifying
aggregation_transform.py, try to modify the pipeline file itself, or move
any Transforms/DoFns that you define from your pipeline launcher file into
a separate file, then you don't need to pass --save_main_session.

On Mon, Oct 28, 2019 at 5:05 PM Valentyn Tymofieiev <va...@google.com>
wrote:

> Could you confirm whether or not you are using --save_main_session when
> you launch your pipeline?
>
> On Mon, Oct 28, 2019 at 4:59 PM Valentyn Tymofieiev <va...@google.com>
> wrote:
>
>> +user@, bcc: dev@
>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>> this issue, although we saw instances of this bug in exactly opposite
>> scenarios - when pipeline was defined *in one file*, but not in multiple
>> files.
>>
>> Could you try replacing instances of super() in aggregation_transform.py
>> as done in https://github.com/apache/beam/pull/9513 and see if this
>> issue is still reproducible?
>>
>> If that doesn't work, I would try to get the dump of serialized_fn, and
>> try to reproduce the issue in isolated environment, such as:
>>
>> form apache_beam.internal import pickler
>> serialized_fn = "..content.."
>> pickler.loads(serialized_fn)
>>
>> then I would try to trim the doFn in the example to a
>> minimally-reproducible example. It could be another issue with dill
>> dependency.
>>
>>
>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> We have noticed a weird intermittent issue on Python3 but we don't run
>>> into this issue on python2. Sometimes when we are trying to submit the
>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>> double-checked and we do find the attribute/methods are present in the
>>> right module and in right place but somehow the pipeline still complains
>>> about it. In some cases, we refer methods before their definition. We tried
>>> to reorder the method definition but that didn't help at all.
>>>
>>> We don't see the same issue when the entire pipeline is defined in one
>>> file. Also, note that this doesn't happen all the time when we submit the
>>> pipeline, so I feel it is some kind of race condition. When we enable the
>>> worker recycle logic it happens most of the time when sdk worker is
>>> recycled.
>>>
>>> Some more information about the environment:
>>> Python version: 3
>>> Beam version: 2.16
>>> Flink version: 1.8
>>>
>>> *Stack trace: *
>>>
>>>    - :
>>>
>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>> bundle}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>> at
>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>> at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>> at
>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>> ... 7 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>> 6: Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 307, in get
>>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>>> IndexError: pop from empty list
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>> line 261, in loads
>>>     return dill.loads(s)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 317, in loads
>>>     return load(file, ignore)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 305, in load
>>>     obj = pik.load()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 474, in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 165, in _execute
>>>     response = task()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 198, in <lambda>
>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 351, in do_instruction
>>>     request.instruction_id)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 371, in process_bundle
>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 313, in get
>>>     self.data_channel_factory)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 576, in __init__
>>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 620, in create_execution_tree
>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 619, in <listcomp>
>>>     for transform_id in sorted(
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 603, in get_operation
>>>     in descriptor.transforms[transform_id].outputs.items()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 602, in <dictcomp>
>>>     for tag, pcoll_id
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 601, in <listcomp>
>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 603, in get_operation
>>>     in descriptor.transforms[transform_id].outputs.items()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 602, in <dictcomp>
>>>     for tag, pcoll_id
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 601, in <listcomp>
>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 606, in get_operation
>>>     transform_id, transform_consumers)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 865, in create_operation
>>>     return creator(self, transform_id, transform_proto, payload,
>>> consumers)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1108, in create
>>>     serialized_fn, parameter)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1146, in _create_pardo_operation
>>>     dofn_data = pickler.loads(serialized_fn)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>> line 265, in loads
>>>     return dill.loads(s)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 317, in loads
>>>     return load(file, ignore)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 305, in load
>>>     obj = pik.load()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 474, in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>>> 'pricingrealtime.aggregation.aggregation_transform' from
>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>
>>>

Re: Pipeline AttributeError on Python3

Posted by Valentyn Tymofieiev <va...@google.com>.
Could you confirm whether or not you are using --save_main_session when you
launch your pipeline?

On Mon, Oct 28, 2019 at 4:59 PM Valentyn Tymofieiev <va...@google.com>
wrote:

> +user@, bcc: dev@
> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
> this issue, although we saw instances of this bug in exactly opposite
> scenarios - when pipeline was defined *in one file*, but not in multiple
> files.
>
> Could you try replacing instances of super() in aggregation_transform.py
> as done in https://github.com/apache/beam/pull/9513 and see if this issue
> is still reproducible?
>
> If that doesn't work, I would try to get the dump of serialized_fn, and
> try to reproduce the issue in isolated environment, such as:
>
> form apache_beam.internal import pickler
> serialized_fn = "..content.."
> pickler.loads(serialized_fn)
>
> then I would try to trim the doFn in the example to a
> minimally-reproducible example. It could be another issue with dill
> dependency.
>
>
> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com> wrote:
>
>> Hi All,
>>
>> We have noticed a weird intermittent issue on Python3 but we don't run
>> into this issue on python2. Sometimes when we are trying to submit the
>> pipeline, we get AttributeError (Check the stack trace below).  we have
>> double-checked and we do find the attribute/methods are present in the
>> right module and in right place but somehow the pipeline still complains
>> about it. In some cases, we refer methods before their definition. We tried
>> to reorder the method definition but that didn't help at all.
>>
>> We don't see the same issue when the entire pipeline is defined in one
>> file. Also, note that this doesn't happen all the time when we submit the
>> pipeline, so I feel it is some kind of race condition. When we enable the
>> worker recycle logic it happens most of the time when sdk worker is
>> recycled.
>>
>> Some more information about the environment:
>> Python version: 3
>> Beam version: 2.16
>> Flink version: 1.8
>>
>> *Stack trace: *
>>
>>    - :
>>
>> TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>> at
>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>> at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>> at
>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>> ... 7 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: Error received from SDK harness for instruction
>> 6: Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 307, in get
>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>> IndexError: pop from empty list
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>> line 261, in loads
>>     return dill.loads(s)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 317, in loads
>>     return load(file, ignore)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 305, in load
>>     obj = pik.load()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 474, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>> 'pricingrealtime.aggregation.aggregation_transform' from
>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 165, in _execute
>>     response = task()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 198, in <lambda>
>>     self._execute(lambda: worker.do_instruction(work), work)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 351, in do_instruction
>>     request.instruction_id)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 371, in process_bundle
>>     instruction_id, request.process_bundle_descriptor_reference)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 313, in get
>>     self.data_channel_factory)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 576, in __init__
>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 620, in create_execution_tree
>>     descriptor.transforms, key=topological_height, reverse=True)])
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 619, in <listcomp>
>>     for transform_id in sorted(
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 603, in get_operation
>>     in descriptor.transforms[transform_id].outputs.items()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 602, in <dictcomp>
>>     for tag, pcoll_id
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 601, in <listcomp>
>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 603, in get_operation
>>     in descriptor.transforms[transform_id].outputs.items()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 602, in <dictcomp>
>>     for tag, pcoll_id
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 601, in <listcomp>
>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 606, in get_operation
>>     transform_id, transform_consumers)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 865, in create_operation
>>     return creator(self, transform_id, transform_proto, payload,
>> consumers)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1108, in create
>>     serialized_fn, parameter)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1146, in _create_pardo_operation
>>     dofn_data = pickler.loads(serialized_fn)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>> line 265, in loads
>>     return dill.loads(s)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 317, in loads
>>     return load(file, ignore)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 305, in load
>>     obj = pik.load()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 474, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>> 'pricingrealtime.aggregation.aggregation_transform' from
>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>
>>

Re: [External] Re: Pipeline AttributeError on Python3

Posted by Guenther Starnberger <gs...@yelp.com>.
This looks exactly like a race condition that we've encountered on Python
3.7.1: There's a bug in some older 3.7.x releases that breaks the
thread-safety of the unpickler, as concurrent unpickle threads can access a
module before it has been fully imported. See
https://bugs.python.org/issue34572 for more information.

The traceback shows a Python 3.6 venv so this could be a different issue
(the unpickle bug was introduced in version 3.7). If it's the same bug then
upgrading to Python 3.7.3 or higher should fix that issue. One potential
workaround is to ensure that all of the modules get imported during the
initialization of the sdk_worker, as this bug only affects imports done by
the unpickler.

- Guenther


On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <va...@google.com>
wrote:

> +user@, bcc: dev@
> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
> this issue, although we saw instances of this bug in exactly opposite
> scenarios - when pipeline was defined *in one file*, but not in multiple
> files.
>
> Could you try replacing instances of super() in aggregation_transform.py
> as done in https://github.com/apache/beam/pull/9513 and see if this issue
> is still reproducible?
>
> If that doesn't work, I would try to get the dump of serialized_fn, and
> try to reproduce the issue in isolated environment, such as:
>
> form apache_beam.internal import pickler
> serialized_fn = "..content.."
> pickler.loads(serialized_fn)
>
> then I would try to trim the doFn in the example to a
> minimally-reproducible example. It could be another issue with dill
> dependency.
>
>
> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com> wrote:
>
>> Hi All,
>>
>> We have noticed a weird intermittent issue on Python3 but we don't run
>> into this issue on python2. Sometimes when we are trying to submit the
>> pipeline, we get AttributeError (Check the stack trace below).  we have
>> double-checked and we do find the attribute/methods are present in the
>> right module and in right place but somehow the pipeline still complains
>> about it. In some cases, we refer methods before their definition. We tried
>> to reorder the method definition but that didn't help at all.
>>
>> We don't see the same issue when the entire pipeline is defined in one
>> file. Also, note that this doesn't happen all the time when we submit the
>> pipeline, so I feel it is some kind of race condition. When we enable the
>> worker recycle logic it happens most of the time when sdk worker is
>> recycled.
>>
>> Some more information about the environment:
>> Python version: 3
>> Beam version: 2.16
>> Flink version: 1.8
>>
>> *Stack trace: *
>>
>>    - :
>>
>> TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>> at
>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>> at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>> at
>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>> ... 7 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: Error received from SDK harness for instruction
>> 6: Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 307, in get
>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>> IndexError: pop from empty list
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>> line 261, in loads
>>     return dill.loads(s)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 317, in loads
>>     return load(file, ignore)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 305, in load
>>     obj = pik.load()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 474, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>> 'pricingrealtime.aggregation.aggregation_transform' from
>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 165, in _execute
>>     response = task()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 198, in <lambda>
>>     self._execute(lambda: worker.do_instruction(work), work)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 351, in do_instruction
>>     request.instruction_id)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 371, in process_bundle
>>     instruction_id, request.process_bundle_descriptor_reference)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 313, in get
>>     self.data_channel_factory)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 576, in __init__
>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 620, in create_execution_tree
>>     descriptor.transforms, key=topological_height, reverse=True)])
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 619, in <listcomp>
>>     for transform_id in sorted(
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 603, in get_operation
>>     in descriptor.transforms[transform_id].outputs.items()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 602, in <dictcomp>
>>     for tag, pcoll_id
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 601, in <listcomp>
>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 603, in get_operation
>>     in descriptor.transforms[transform_id].outputs.items()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 602, in <dictcomp>
>>     for tag, pcoll_id
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 601, in <listcomp>
>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 606, in get_operation
>>     transform_id, transform_consumers)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 865, in create_operation
>>     return creator(self, transform_id, transform_proto, payload,
>> consumers)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1108, in create
>>     serialized_fn, parameter)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1146, in _create_pardo_operation
>>     dofn_data = pickler.loads(serialized_fn)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>> line 265, in loads
>>     return dill.loads(s)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 317, in loads
>>     return load(file, ignore)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 305, in load
>>     obj = pik.load()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 474, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>> 'pricingrealtime.aggregation.aggregation_transform' from
>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>
>>

Re: Pipeline AttributeError on Python3

Posted by Thomas Weise <th...@apache.org>.
We have not seen the issue with Python 3.6 on 2.16+ after applying this
patch. 🎉

Thanks!

On Thu, Nov 21, 2019 at 4:41 PM Thomas Weise <th...@apache.org> wrote:

> We are currently verifying the patch. Will report back tomorrow.
>
> On Thu, Nov 21, 2019 at 8:40 AM Valentyn Tymofieiev <va...@google.com>
> wrote:
>
>> That would be helpful, thanks a lot! It should be a straightforward patch.
>> Also, thanks Guenther, for sharing your investigation on
>> https://bugs.python.org/issue34572, it was very helpful.
>>
>> On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise <th...@apache.org> wrote:
>>
>>> Valentyn, thanks a lot for following up on this.
>>>
>>> If the change can be cherry picked in isolation, we should be able to
>>> verify this soon (with 2.16).
>>>
>>>
>>> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev <va...@google.com>
>>> wrote:
>>>
>>>> To close the loop here: To my knowledge this issue affects all Python 3
>>>> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
>>>> including users on Python 3.7.3 and newer versions.
>>>>
>>>> The issue is addressed on Beam master, and we have a cherry-pick out
>>>> for Beam 2.17.0.
>>>>
>>>> Workaround options for users on 2.16.0 and earlier SDKs:
>>>>
>>>> - Patch the SDK you are using with
>>>> https://github.com/apache/beam/pull/10167.
>>>> - Temporarily switch to Python 2 until 2.17.0. We have not seen the
>>>> issue on Python 2, so it may be rare on non-existent on Python 2.
>>>> - Pass --experiments worker_threads=1 . This option may work only for
>>>> some, but not all pipelines.
>>>>
>>>> See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for
>>>> details on the issue.
>>>>
>>>> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <
>>>> valentyn@google.com> wrote:
>>>>
>>>>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to
>>>>> track this issue and any recommendation for the users that will come out of
>>>>> it.
>>>>>
>>>>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <
>>>>> valentyn@google.com> wrote:
>>>>>
>>>>>>  I think we have heard of this issue from the same source:
>>>>>>
>>>>>> This looks exactly like a race condition that we've encountered on
>>>>>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>>>>>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>>>>>>> module before it has been fully imported. See
>>>>>>> https://bugs.python.org/issue34572 for more information.
>>>>>>>
>>>>>>> The traceback shows a Python 3.6 venv so this could be a different
>>>>>>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>>>>>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>>>>>>> potential workaround is to ensure that all of the modules get imported
>>>>>>> during the initialization of the sdk_worker, as this bug only affects
>>>>>>> imports done by the unpickler.
>>>>>>
>>>>>>
>>>>>> The symptoms do sound similar, so I would try to reproduce your issue
>>>>>> on 3.7.3 and see if it is gone, or try to reproduce
>>>>>> https://bugs.python.org/issue34572 in the version of interpreter you
>>>>>> use. If this doesn't help, you can try to reproduce the race using your
>>>>>> input.
>>>>>>
>>>>>> To get the output of serialized do fn, you could do the following:
>>>>>> 1. Patch https://github.com/apache/beam/pull/10036.
>>>>>> 2. Set logging level to DEBUG, see:
>>>>>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>>>>>> .
>>>>>> 3. Check for log output for payload of your transform, it may look
>>>>>> like:
>>>>>>
>>>>>>     transforms {
>>>>>>       key:
>>>>>> "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>>>>>       value {
>>>>>>         spec {
>>>>>>           urn: "beam:transform:pardo:v1"
>>>>>>           payload: "\n\347\006\n\275\006\n
>>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>>>>>> ....
>>>>>>
>>>>>> Then you can extract the output of pickled fn:
>>>>>>
>>>>>> from apache_beam.utils import proto_utils
>>>>>> from apache_beam.portability.api import beam_runner_api_pb2
>>>>>> from apache_beam.internal import pickler
>>>>>>
>>>>>> payload = b'\n\347\006\n\275\006\n
>>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>>>>>> pardo_payload = proto_utils.parse_Bytes(x,
>>>>>> beam_runner_api_pb2.ParDoPayload)
>>>>>> pickled_fn = pardo_payload.do_fn.spec.payload
>>>>>>
>>>>>> pickler.loads(pickle_fn) # Presumably the race happens here when
>>>>>> unpickling one of your transforms
>>>>>> (pricingrealtime.aggregation.aggregation_transform).
>>>>>>
>>>>>>
>>>>>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Valentyn,
>>>>>>>
>>>>>>> Aggregation_transform.py doesn't have any transformation method
>>>>>>> which extends beam.DoFn. We are using plain python method which we passed
>>>>>>> in beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>>>>>>> please let me the process?
>>>>>>>
>>>>>>> I also heard that some people ran into this issue on Python 3.7.1
>>>>>>> but the same issue is not present on Python 3.7.3. Can you confirm this?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
>>>>>>> valentyn@google.com> wrote:
>>>>>>>
>>>>>>>> +user@, bcc: dev@
>>>>>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be
>>>>>>>> contributing to this issue, although we saw instances of this bug in
>>>>>>>> exactly opposite scenarios - when pipeline was defined *in one
>>>>>>>> file*, but not in multiple files.
>>>>>>>>
>>>>>>>> Could you try replacing instances of super() in
>>>>>>>> aggregation_transform.py  as done in
>>>>>>>> https://github.com/apache/beam/pull/9513 and see if this issue is
>>>>>>>> still reproducible?
>>>>>>>>
>>>>>>>> If that doesn't work, I would try to get the dump of serialized_fn,
>>>>>>>> and try to reproduce the issue in isolated environment, such as:
>>>>>>>>
>>>>>>>> form apache_beam.internal import pickler
>>>>>>>> serialized_fn = "..content.."
>>>>>>>> pickler.loads(serialized_fn)
>>>>>>>>
>>>>>>>> then I would try to trim the doFn in the example to a
>>>>>>>> minimally-reproducible example. It could be another issue with dill
>>>>>>>> dependency.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> We have noticed a weird intermittent issue on Python3 but we don't
>>>>>>>>> run into this issue on python2. Sometimes when we are trying to submit the
>>>>>>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>>>>>>> double-checked and we do find the attribute/methods are present in the
>>>>>>>>> right module and in right place but somehow the pipeline still complains
>>>>>>>>> about it. In some cases, we refer methods before their definition. We tried
>>>>>>>>> to reorder the method definition but that didn't help at all.
>>>>>>>>>
>>>>>>>>> We don't see the same issue when the entire pipeline is defined in
>>>>>>>>> one file. Also, note that this doesn't happen all the time when we submit
>>>>>>>>> the pipeline, so I feel it is some kind of race condition. When we enable
>>>>>>>>> the worker recycle logic it happens most of the time when sdk worker is
>>>>>>>>> recycled.
>>>>>>>>>
>>>>>>>>> Some more information about the environment:
>>>>>>>>> Python version: 3
>>>>>>>>> Beam version: 2.16
>>>>>>>>> Flink version: 1.8
>>>>>>>>>
>>>>>>>>> *Stack trace: *
>>>>>>>>>
>>>>>>>>>    - :
>>>>>>>>>
>>>>>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>>>>>>> bundle}
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote
>>>>>>>>> bundle
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>>>>>>> ... 7 more
>>>>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>>>>>>> 6: Traceback (most recent call last):
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 307, in get
>>>>>>>>>     processor =
>>>>>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>>>>>>> IndexError: pop from empty list
>>>>>>>>>
>>>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>>>
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>>>> line 261, in loads
>>>>>>>>>     return dill.loads(s)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 317, in loads
>>>>>>>>>     return load(file, ignore)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 305, in load
>>>>>>>>>     obj = pik.load()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 474, in find_class
>>>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>>>>>>
>>>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>>>
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 165, in _execute
>>>>>>>>>     response = task()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 198, in <lambda>
>>>>>>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 351, in do_instruction
>>>>>>>>>     request.instruction_id)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 371, in process_bundle
>>>>>>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 313, in get
>>>>>>>>>     self.data_channel_factory)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 576, in __init__
>>>>>>>>>     self.ops =
>>>>>>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 620, in create_execution_tree
>>>>>>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 619, in <listcomp>
>>>>>>>>>     for transform_id in sorted(
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 544, in wrapper
>>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 603, in get_operation
>>>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 602, in <dictcomp>
>>>>>>>>>     for tag, pcoll_id
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 601, in <listcomp>
>>>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 544, in wrapper
>>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 603, in get_operation
>>>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 602, in <dictcomp>
>>>>>>>>>     for tag, pcoll_id
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 601, in <listcomp>
>>>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 544, in wrapper
>>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 606, in get_operation
>>>>>>>>>     transform_id, transform_consumers)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 865, in create_operation
>>>>>>>>>     return creator(self, transform_id, transform_proto, payload,
>>>>>>>>> consumers)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 1108, in create
>>>>>>>>>     serialized_fn, parameter)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 1146, in _create_pardo_operation
>>>>>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>>>> line 265, in loads
>>>>>>>>>     return dill.loads(s)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 317, in loads
>>>>>>>>>     return load(file, ignore)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 305, in load
>>>>>>>>>     obj = pik.load()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 474, in find_class
>>>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>>>>>>
>>>>>>>>>

Re: Pipeline AttributeError on Python3

Posted by Thomas Weise <th...@apache.org>.
We have not seen the issue with Python 3.6 on 2.16+ after applying this
patch. 🎉

Thanks!

On Thu, Nov 21, 2019 at 4:41 PM Thomas Weise <th...@apache.org> wrote:

> We are currently verifying the patch. Will report back tomorrow.
>
> On Thu, Nov 21, 2019 at 8:40 AM Valentyn Tymofieiev <va...@google.com>
> wrote:
>
>> That would be helpful, thanks a lot! It should be a straightforward patch.
>> Also, thanks Guenther, for sharing your investigation on
>> https://bugs.python.org/issue34572, it was very helpful.
>>
>> On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise <th...@apache.org> wrote:
>>
>>> Valentyn, thanks a lot for following up on this.
>>>
>>> If the change can be cherry picked in isolation, we should be able to
>>> verify this soon (with 2.16).
>>>
>>>
>>> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev <va...@google.com>
>>> wrote:
>>>
>>>> To close the loop here: To my knowledge this issue affects all Python 3
>>>> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
>>>> including users on Python 3.7.3 and newer versions.
>>>>
>>>> The issue is addressed on Beam master, and we have a cherry-pick out
>>>> for Beam 2.17.0.
>>>>
>>>> Workaround options for users on 2.16.0 and earlier SDKs:
>>>>
>>>> - Patch the SDK you are using with
>>>> https://github.com/apache/beam/pull/10167.
>>>> - Temporarily switch to Python 2 until 2.17.0. We have not seen the
>>>> issue on Python 2, so it may be rare on non-existent on Python 2.
>>>> - Pass --experiments worker_threads=1 . This option may work only for
>>>> some, but not all pipelines.
>>>>
>>>> See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for
>>>> details on the issue.
>>>>
>>>> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <
>>>> valentyn@google.com> wrote:
>>>>
>>>>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to
>>>>> track this issue and any recommendation for the users that will come out of
>>>>> it.
>>>>>
>>>>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <
>>>>> valentyn@google.com> wrote:
>>>>>
>>>>>>  I think we have heard of this issue from the same source:
>>>>>>
>>>>>> This looks exactly like a race condition that we've encountered on
>>>>>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>>>>>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>>>>>>> module before it has been fully imported. See
>>>>>>> https://bugs.python.org/issue34572 for more information.
>>>>>>>
>>>>>>> The traceback shows a Python 3.6 venv so this could be a different
>>>>>>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>>>>>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>>>>>>> potential workaround is to ensure that all of the modules get imported
>>>>>>> during the initialization of the sdk_worker, as this bug only affects
>>>>>>> imports done by the unpickler.
>>>>>>
>>>>>>
>>>>>> The symptoms do sound similar, so I would try to reproduce your issue
>>>>>> on 3.7.3 and see if it is gone, or try to reproduce
>>>>>> https://bugs.python.org/issue34572 in the version of interpreter you
>>>>>> use. If this doesn't help, you can try to reproduce the race using your
>>>>>> input.
>>>>>>
>>>>>> To get the output of serialized do fn, you could do the following:
>>>>>> 1. Patch https://github.com/apache/beam/pull/10036.
>>>>>> 2. Set logging level to DEBUG, see:
>>>>>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>>>>>> .
>>>>>> 3. Check for log output for payload of your transform, it may look
>>>>>> like:
>>>>>>
>>>>>>     transforms {
>>>>>>       key:
>>>>>> "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>>>>>       value {
>>>>>>         spec {
>>>>>>           urn: "beam:transform:pardo:v1"
>>>>>>           payload: "\n\347\006\n\275\006\n
>>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>>>>>> ....
>>>>>>
>>>>>> Then you can extract the output of pickled fn:
>>>>>>
>>>>>> from apache_beam.utils import proto_utils
>>>>>> from apache_beam.portability.api import beam_runner_api_pb2
>>>>>> from apache_beam.internal import pickler
>>>>>>
>>>>>> payload = b'\n\347\006\n\275\006\n
>>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>>>>>> pardo_payload = proto_utils.parse_Bytes(x,
>>>>>> beam_runner_api_pb2.ParDoPayload)
>>>>>> pickled_fn = pardo_payload.do_fn.spec.payload
>>>>>>
>>>>>> pickler.loads(pickle_fn) # Presumably the race happens here when
>>>>>> unpickling one of your transforms
>>>>>> (pricingrealtime.aggregation.aggregation_transform).
>>>>>>
>>>>>>
>>>>>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Valentyn,
>>>>>>>
>>>>>>> Aggregation_transform.py doesn't have any transformation method
>>>>>>> which extends beam.DoFn. We are using plain python method which we passed
>>>>>>> in beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>>>>>>> please let me the process?
>>>>>>>
>>>>>>> I also heard that some people ran into this issue on Python 3.7.1
>>>>>>> but the same issue is not present on Python 3.7.3. Can you confirm this?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
>>>>>>> valentyn@google.com> wrote:
>>>>>>>
>>>>>>>> +user@, bcc: dev@
>>>>>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be
>>>>>>>> contributing to this issue, although we saw instances of this bug in
>>>>>>>> exactly opposite scenarios - when pipeline was defined *in one
>>>>>>>> file*, but not in multiple files.
>>>>>>>>
>>>>>>>> Could you try replacing instances of super() in
>>>>>>>> aggregation_transform.py  as done in
>>>>>>>> https://github.com/apache/beam/pull/9513 and see if this issue is
>>>>>>>> still reproducible?
>>>>>>>>
>>>>>>>> If that doesn't work, I would try to get the dump of serialized_fn,
>>>>>>>> and try to reproduce the issue in isolated environment, such as:
>>>>>>>>
>>>>>>>> form apache_beam.internal import pickler
>>>>>>>> serialized_fn = "..content.."
>>>>>>>> pickler.loads(serialized_fn)
>>>>>>>>
>>>>>>>> then I would try to trim the doFn in the example to a
>>>>>>>> minimally-reproducible example. It could be another issue with dill
>>>>>>>> dependency.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> We have noticed a weird intermittent issue on Python3 but we don't
>>>>>>>>> run into this issue on python2. Sometimes when we are trying to submit the
>>>>>>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>>>>>>> double-checked and we do find the attribute/methods are present in the
>>>>>>>>> right module and in right place but somehow the pipeline still complains
>>>>>>>>> about it. In some cases, we refer methods before their definition. We tried
>>>>>>>>> to reorder the method definition but that didn't help at all.
>>>>>>>>>
>>>>>>>>> We don't see the same issue when the entire pipeline is defined in
>>>>>>>>> one file. Also, note that this doesn't happen all the time when we submit
>>>>>>>>> the pipeline, so I feel it is some kind of race condition. When we enable
>>>>>>>>> the worker recycle logic it happens most of the time when sdk worker is
>>>>>>>>> recycled.
>>>>>>>>>
>>>>>>>>> Some more information about the environment:
>>>>>>>>> Python version: 3
>>>>>>>>> Beam version: 2.16
>>>>>>>>> Flink version: 1.8
>>>>>>>>>
>>>>>>>>> *Stack trace: *
>>>>>>>>>
>>>>>>>>>    - :
>>>>>>>>>
>>>>>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>>>>>>> bundle}
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote
>>>>>>>>> bundle
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>>>>>>> at
>>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>>>>>>> ... 7 more
>>>>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>>>>>>> 6: Traceback (most recent call last):
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 307, in get
>>>>>>>>>     processor =
>>>>>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>>>>>>> IndexError: pop from empty list
>>>>>>>>>
>>>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>>>
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>>>> line 261, in loads
>>>>>>>>>     return dill.loads(s)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 317, in loads
>>>>>>>>>     return load(file, ignore)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 305, in load
>>>>>>>>>     obj = pik.load()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 474, in find_class
>>>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>>>>>>
>>>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>>>
>>>>>>>>> Traceback (most recent call last):
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 165, in _execute
>>>>>>>>>     response = task()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 198, in <lambda>
>>>>>>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 351, in do_instruction
>>>>>>>>>     request.instruction_id)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 371, in process_bundle
>>>>>>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>>> line 313, in get
>>>>>>>>>     self.data_channel_factory)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 576, in __init__
>>>>>>>>>     self.ops =
>>>>>>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 620, in create_execution_tree
>>>>>>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 619, in <listcomp>
>>>>>>>>>     for transform_id in sorted(
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 544, in wrapper
>>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 603, in get_operation
>>>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 602, in <dictcomp>
>>>>>>>>>     for tag, pcoll_id
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 601, in <listcomp>
>>>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 544, in wrapper
>>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 603, in get_operation
>>>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 602, in <dictcomp>
>>>>>>>>>     for tag, pcoll_id
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 601, in <listcomp>
>>>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 544, in wrapper
>>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 606, in get_operation
>>>>>>>>>     transform_id, transform_consumers)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 865, in create_operation
>>>>>>>>>     return creator(self, transform_id, transform_proto, payload,
>>>>>>>>> consumers)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 1108, in create
>>>>>>>>>     serialized_fn, parameter)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>>> line 1146, in _create_pardo_operation
>>>>>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>>>> line 265, in loads
>>>>>>>>>     return dill.loads(s)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 317, in loads
>>>>>>>>>     return load(file, ignore)
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 305, in load
>>>>>>>>>     obj = pik.load()
>>>>>>>>>   File
>>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>>> line 474, in find_class
>>>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>>>>>>
>>>>>>>>>

Re: Pipeline AttributeError on Python3

Posted by Thomas Weise <th...@apache.org>.
We are currently verifying the patch. Will report back tomorrow.

On Thu, Nov 21, 2019 at 8:40 AM Valentyn Tymofieiev <va...@google.com>
wrote:

> That would be helpful, thanks a lot! It should be a straightforward patch.
> Also, thanks Guenther, for sharing your investigation on
> https://bugs.python.org/issue34572, it was very helpful.
>
> On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise <th...@apache.org> wrote:
>
>> Valentyn, thanks a lot for following up on this.
>>
>> If the change can be cherry picked in isolation, we should be able to
>> verify this soon (with 2.16).
>>
>>
>> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev <va...@google.com>
>> wrote:
>>
>>> To close the loop here: To my knowledge this issue affects all Python 3
>>> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
>>> including users on Python 3.7.3 and newer versions.
>>>
>>> The issue is addressed on Beam master, and we have a cherry-pick out for
>>> Beam 2.17.0.
>>>
>>> Workaround options for users on 2.16.0 and earlier SDKs:
>>>
>>> - Patch the SDK you are using with
>>> https://github.com/apache/beam/pull/10167.
>>> - Temporarily switch to Python 2 until 2.17.0. We have not seen the
>>> issue on Python 2, so it may be rare on non-existent on Python 2.
>>> - Pass --experiments worker_threads=1 . This option may work only for
>>> some, but not all pipelines.
>>>
>>> See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for
>>> details on the issue.
>>>
>>> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <
>>> valentyn@google.com> wrote:
>>>
>>>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
>>>> this issue and any recommendation for the users that will come out of it.
>>>>
>>>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <va...@google.com>
>>>> wrote:
>>>>
>>>>>  I think we have heard of this issue from the same source:
>>>>>
>>>>> This looks exactly like a race condition that we've encountered on
>>>>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>>>>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>>>>>> module before it has been fully imported. See
>>>>>> https://bugs.python.org/issue34572 for more information.
>>>>>>
>>>>>> The traceback shows a Python 3.6 venv so this could be a different
>>>>>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>>>>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>>>>>> potential workaround is to ensure that all of the modules get imported
>>>>>> during the initialization of the sdk_worker, as this bug only affects
>>>>>> imports done by the unpickler.
>>>>>
>>>>>
>>>>> The symptoms do sound similar, so I would try to reproduce your issue
>>>>> on 3.7.3 and see if it is gone, or try to reproduce
>>>>> https://bugs.python.org/issue34572 in the version of interpreter you
>>>>> use. If this doesn't help, you can try to reproduce the race using your
>>>>> input.
>>>>>
>>>>> To get the output of serialized do fn, you could do the following:
>>>>> 1. Patch https://github.com/apache/beam/pull/10036.
>>>>> 2. Set logging level to DEBUG, see:
>>>>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>>>>> .
>>>>> 3. Check for log output for payload of your transform, it may look
>>>>> like:
>>>>>
>>>>>     transforms {
>>>>>       key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>>>>       value {
>>>>>         spec {
>>>>>           urn: "beam:transform:pardo:v1"
>>>>>           payload: "\n\347\006\n\275\006\n
>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>>>>> ....
>>>>>
>>>>> Then you can extract the output of pickled fn:
>>>>>
>>>>> from apache_beam.utils import proto_utils
>>>>> from apache_beam.portability.api import beam_runner_api_pb2
>>>>> from apache_beam.internal import pickler
>>>>>
>>>>> payload = b'\n\347\006\n\275\006\n
>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>>>>> pardo_payload = proto_utils.parse_Bytes(x,
>>>>> beam_runner_api_pb2.ParDoPayload)
>>>>> pickled_fn = pardo_payload.do_fn.spec.payload
>>>>>
>>>>> pickler.loads(pickle_fn) # Presumably the race happens here when
>>>>> unpickling one of your transforms
>>>>> (pricingrealtime.aggregation.aggregation_transform).
>>>>>
>>>>>
>>>>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Valentyn,
>>>>>>
>>>>>> Aggregation_transform.py doesn't have any transformation method which
>>>>>> extends beam.DoFn. We are using plain python method which we passed in
>>>>>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>>>>>> please let me the process?
>>>>>>
>>>>>> I also heard that some people ran into this issue on Python 3.7.1 but
>>>>>> the same issue is not present on Python 3.7.3. Can you confirm this?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
>>>>>> valentyn@google.com> wrote:
>>>>>>
>>>>>>> +user@, bcc: dev@
>>>>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing
>>>>>>> to this issue, although we saw instances of this bug in exactly opposite
>>>>>>> scenarios - when pipeline was defined *in one file*, but not in
>>>>>>> multiple files.
>>>>>>>
>>>>>>> Could you try replacing instances of super() in
>>>>>>> aggregation_transform.py  as done in
>>>>>>> https://github.com/apache/beam/pull/9513 and see if this issue is
>>>>>>> still reproducible?
>>>>>>>
>>>>>>> If that doesn't work, I would try to get the dump of serialized_fn,
>>>>>>> and try to reproduce the issue in isolated environment, such as:
>>>>>>>
>>>>>>> form apache_beam.internal import pickler
>>>>>>> serialized_fn = "..content.."
>>>>>>> pickler.loads(serialized_fn)
>>>>>>>
>>>>>>> then I would try to trim the doFn in the example to a
>>>>>>> minimally-reproducible example. It could be another issue with dill
>>>>>>> dependency.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> We have noticed a weird intermittent issue on Python3 but we don't
>>>>>>>> run into this issue on python2. Sometimes when we are trying to submit the
>>>>>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>>>>>> double-checked and we do find the attribute/methods are present in the
>>>>>>>> right module and in right place but somehow the pipeline still complains
>>>>>>>> about it. In some cases, we refer methods before their definition. We tried
>>>>>>>> to reorder the method definition but that didn't help at all.
>>>>>>>>
>>>>>>>> We don't see the same issue when the entire pipeline is defined in
>>>>>>>> one file. Also, note that this doesn't happen all the time when we submit
>>>>>>>> the pipeline, so I feel it is some kind of race condition. When we enable
>>>>>>>> the worker recycle logic it happens most of the time when sdk worker is
>>>>>>>> recycled.
>>>>>>>>
>>>>>>>> Some more information about the environment:
>>>>>>>> Python version: 3
>>>>>>>> Beam version: 2.16
>>>>>>>> Flink version: 1.8
>>>>>>>>
>>>>>>>> *Stack trace: *
>>>>>>>>
>>>>>>>>    - :
>>>>>>>>
>>>>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>>>>>> bundle}
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>>>>>> at
>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote
>>>>>>>> bundle
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>>>>>> ... 7 more
>>>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>>>>>> 6: Traceback (most recent call last):
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 307, in get
>>>>>>>>     processor =
>>>>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>>>>>> IndexError: pop from empty list
>>>>>>>>
>>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>>
>>>>>>>> Traceback (most recent call last):
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>>> line 261, in loads
>>>>>>>>     return dill.loads(s)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 317, in loads
>>>>>>>>     return load(file, ignore)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 305, in load
>>>>>>>>     obj = pik.load()
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 474, in find_class
>>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>>>>>
>>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>>
>>>>>>>> Traceback (most recent call last):
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 165, in _execute
>>>>>>>>     response = task()
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 198, in <lambda>
>>>>>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 351, in do_instruction
>>>>>>>>     request.instruction_id)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 371, in process_bundle
>>>>>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 313, in get
>>>>>>>>     self.data_channel_factory)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 576, in __init__
>>>>>>>>     self.ops =
>>>>>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 620, in create_execution_tree
>>>>>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 619, in <listcomp>
>>>>>>>>     for transform_id in sorted(
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 544, in wrapper
>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 603, in get_operation
>>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 602, in <dictcomp>
>>>>>>>>     for tag, pcoll_id
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 601, in <listcomp>
>>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 544, in wrapper
>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 603, in get_operation
>>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 602, in <dictcomp>
>>>>>>>>     for tag, pcoll_id
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 601, in <listcomp>
>>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 544, in wrapper
>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 606, in get_operation
>>>>>>>>     transform_id, transform_consumers)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 865, in create_operation
>>>>>>>>     return creator(self, transform_id, transform_proto, payload,
>>>>>>>> consumers)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 1108, in create
>>>>>>>>     serialized_fn, parameter)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 1146, in _create_pardo_operation
>>>>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>>> line 265, in loads
>>>>>>>>     return dill.loads(s)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 317, in loads
>>>>>>>>     return load(file, ignore)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 305, in load
>>>>>>>>     obj = pik.load()
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 474, in find_class
>>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>>>>>
>>>>>>>>

Re: Pipeline AttributeError on Python3

Posted by Thomas Weise <th...@apache.org>.
We are currently verifying the patch. Will report back tomorrow.

On Thu, Nov 21, 2019 at 8:40 AM Valentyn Tymofieiev <va...@google.com>
wrote:

> That would be helpful, thanks a lot! It should be a straightforward patch.
> Also, thanks Guenther, for sharing your investigation on
> https://bugs.python.org/issue34572, it was very helpful.
>
> On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise <th...@apache.org> wrote:
>
>> Valentyn, thanks a lot for following up on this.
>>
>> If the change can be cherry picked in isolation, we should be able to
>> verify this soon (with 2.16).
>>
>>
>> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev <va...@google.com>
>> wrote:
>>
>>> To close the loop here: To my knowledge this issue affects all Python 3
>>> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
>>> including users on Python 3.7.3 and newer versions.
>>>
>>> The issue is addressed on Beam master, and we have a cherry-pick out for
>>> Beam 2.17.0.
>>>
>>> Workaround options for users on 2.16.0 and earlier SDKs:
>>>
>>> - Patch the SDK you are using with
>>> https://github.com/apache/beam/pull/10167.
>>> - Temporarily switch to Python 2 until 2.17.0. We have not seen the
>>> issue on Python 2, so it may be rare on non-existent on Python 2.
>>> - Pass --experiments worker_threads=1 . This option may work only for
>>> some, but not all pipelines.
>>>
>>> See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for
>>> details on the issue.
>>>
>>> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <
>>> valentyn@google.com> wrote:
>>>
>>>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
>>>> this issue and any recommendation for the users that will come out of it.
>>>>
>>>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <va...@google.com>
>>>> wrote:
>>>>
>>>>>  I think we have heard of this issue from the same source:
>>>>>
>>>>> This looks exactly like a race condition that we've encountered on
>>>>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>>>>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>>>>>> module before it has been fully imported. See
>>>>>> https://bugs.python.org/issue34572 for more information.
>>>>>>
>>>>>> The traceback shows a Python 3.6 venv so this could be a different
>>>>>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>>>>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>>>>>> potential workaround is to ensure that all of the modules get imported
>>>>>> during the initialization of the sdk_worker, as this bug only affects
>>>>>> imports done by the unpickler.
>>>>>
>>>>>
>>>>> The symptoms do sound similar, so I would try to reproduce your issue
>>>>> on 3.7.3 and see if it is gone, or try to reproduce
>>>>> https://bugs.python.org/issue34572 in the version of interpreter you
>>>>> use. If this doesn't help, you can try to reproduce the race using your
>>>>> input.
>>>>>
>>>>> To get the output of serialized do fn, you could do the following:
>>>>> 1. Patch https://github.com/apache/beam/pull/10036.
>>>>> 2. Set logging level to DEBUG, see:
>>>>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>>>>> .
>>>>> 3. Check for log output for payload of your transform, it may look
>>>>> like:
>>>>>
>>>>>     transforms {
>>>>>       key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>>>>       value {
>>>>>         spec {
>>>>>           urn: "beam:transform:pardo:v1"
>>>>>           payload: "\n\347\006\n\275\006\n
>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>>>>> ....
>>>>>
>>>>> Then you can extract the output of pickled fn:
>>>>>
>>>>> from apache_beam.utils import proto_utils
>>>>> from apache_beam.portability.api import beam_runner_api_pb2
>>>>> from apache_beam.internal import pickler
>>>>>
>>>>> payload = b'\n\347\006\n\275\006\n
>>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>>>>> pardo_payload = proto_utils.parse_Bytes(x,
>>>>> beam_runner_api_pb2.ParDoPayload)
>>>>> pickled_fn = pardo_payload.do_fn.spec.payload
>>>>>
>>>>> pickler.loads(pickle_fn) # Presumably the race happens here when
>>>>> unpickling one of your transforms
>>>>> (pricingrealtime.aggregation.aggregation_transform).
>>>>>
>>>>>
>>>>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Valentyn,
>>>>>>
>>>>>> Aggregation_transform.py doesn't have any transformation method which
>>>>>> extends beam.DoFn. We are using plain python method which we passed in
>>>>>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>>>>>> please let me the process?
>>>>>>
>>>>>> I also heard that some people ran into this issue on Python 3.7.1 but
>>>>>> the same issue is not present on Python 3.7.3. Can you confirm this?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
>>>>>> valentyn@google.com> wrote:
>>>>>>
>>>>>>> +user@, bcc: dev@
>>>>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing
>>>>>>> to this issue, although we saw instances of this bug in exactly opposite
>>>>>>> scenarios - when pipeline was defined *in one file*, but not in
>>>>>>> multiple files.
>>>>>>>
>>>>>>> Could you try replacing instances of super() in
>>>>>>> aggregation_transform.py  as done in
>>>>>>> https://github.com/apache/beam/pull/9513 and see if this issue is
>>>>>>> still reproducible?
>>>>>>>
>>>>>>> If that doesn't work, I would try to get the dump of serialized_fn,
>>>>>>> and try to reproduce the issue in isolated environment, such as:
>>>>>>>
>>>>>>> form apache_beam.internal import pickler
>>>>>>> serialized_fn = "..content.."
>>>>>>> pickler.loads(serialized_fn)
>>>>>>>
>>>>>>> then I would try to trim the doFn in the example to a
>>>>>>> minimally-reproducible example. It could be another issue with dill
>>>>>>> dependency.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> We have noticed a weird intermittent issue on Python3 but we don't
>>>>>>>> run into this issue on python2. Sometimes when we are trying to submit the
>>>>>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>>>>>> double-checked and we do find the attribute/methods are present in the
>>>>>>>> right module and in right place but somehow the pipeline still complains
>>>>>>>> about it. In some cases, we refer methods before their definition. We tried
>>>>>>>> to reorder the method definition but that didn't help at all.
>>>>>>>>
>>>>>>>> We don't see the same issue when the entire pipeline is defined in
>>>>>>>> one file. Also, note that this doesn't happen all the time when we submit
>>>>>>>> the pipeline, so I feel it is some kind of race condition. When we enable
>>>>>>>> the worker recycle logic it happens most of the time when sdk worker is
>>>>>>>> recycled.
>>>>>>>>
>>>>>>>> Some more information about the environment:
>>>>>>>> Python version: 3
>>>>>>>> Beam version: 2.16
>>>>>>>> Flink version: 1.8
>>>>>>>>
>>>>>>>> *Stack trace: *
>>>>>>>>
>>>>>>>>    - :
>>>>>>>>
>>>>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>>>>>> bundle}
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>>>>>> at
>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote
>>>>>>>> bundle
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>>>>>> ... 7 more
>>>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>>>>>> 6: Traceback (most recent call last):
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 307, in get
>>>>>>>>     processor =
>>>>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>>>>>> IndexError: pop from empty list
>>>>>>>>
>>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>>
>>>>>>>> Traceback (most recent call last):
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>>> line 261, in loads
>>>>>>>>     return dill.loads(s)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 317, in loads
>>>>>>>>     return load(file, ignore)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 305, in load
>>>>>>>>     obj = pik.load()
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 474, in find_class
>>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>>>>>
>>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>>
>>>>>>>> Traceback (most recent call last):
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 165, in _execute
>>>>>>>>     response = task()
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 198, in <lambda>
>>>>>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 351, in do_instruction
>>>>>>>>     request.instruction_id)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 371, in process_bundle
>>>>>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>>> line 313, in get
>>>>>>>>     self.data_channel_factory)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 576, in __init__
>>>>>>>>     self.ops =
>>>>>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 620, in create_execution_tree
>>>>>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 619, in <listcomp>
>>>>>>>>     for transform_id in sorted(
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 544, in wrapper
>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 603, in get_operation
>>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 602, in <dictcomp>
>>>>>>>>     for tag, pcoll_id
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 601, in <listcomp>
>>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 544, in wrapper
>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 603, in get_operation
>>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 602, in <dictcomp>
>>>>>>>>     for tag, pcoll_id
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 601, in <listcomp>
>>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 544, in wrapper
>>>>>>>>     result = cache[args] = func(*args)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 606, in get_operation
>>>>>>>>     transform_id, transform_consumers)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 865, in create_operation
>>>>>>>>     return creator(self, transform_id, transform_proto, payload,
>>>>>>>> consumers)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 1108, in create
>>>>>>>>     serialized_fn, parameter)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>>> line 1146, in _create_pardo_operation
>>>>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>>> line 265, in loads
>>>>>>>>     return dill.loads(s)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 317, in loads
>>>>>>>>     return load(file, ignore)
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 305, in load
>>>>>>>>     obj = pik.load()
>>>>>>>>   File
>>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>>> line 474, in find_class
>>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>>>>>
>>>>>>>>

Re: Pipeline AttributeError on Python3

Posted by Valentyn Tymofieiev <va...@google.com>.
That would be helpful, thanks a lot! It should be a straightforward patch.
Also, thanks Guenther, for sharing your investigation on
https://bugs.python.org/issue34572, it was very helpful.

On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise <th...@apache.org> wrote:

> Valentyn, thanks a lot for following up on this.
>
> If the change can be cherry picked in isolation, we should be able to
> verify this soon (with 2.16).
>
>
> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev <va...@google.com>
> wrote:
>
>> To close the loop here: To my knowledge this issue affects all Python 3
>> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
>> including users on Python 3.7.3 and newer versions.
>>
>> The issue is addressed on Beam master, and we have a cherry-pick out for
>> Beam 2.17.0.
>>
>> Workaround options for users on 2.16.0 and earlier SDKs:
>>
>> - Patch the SDK you are using with
>> https://github.com/apache/beam/pull/10167.
>> - Temporarily switch to Python 2 until 2.17.0. We have not seen the issue
>> on Python 2, so it may be rare on non-existent on Python 2.
>> - Pass --experiments worker_threads=1 . This option may work only for
>> some, but not all pipelines.
>>
>> See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for
>> details on the issue.
>>
>> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <va...@google.com>
>> wrote:
>>
>>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
>>> this issue and any recommendation for the users that will come out of it.
>>>
>>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <va...@google.com>
>>> wrote:
>>>
>>>>  I think we have heard of this issue from the same source:
>>>>
>>>> This looks exactly like a race condition that we've encountered on
>>>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>>>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>>>>> module before it has been fully imported. See
>>>>> https://bugs.python.org/issue34572 for more information.
>>>>>
>>>>> The traceback shows a Python 3.6 venv so this could be a different
>>>>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>>>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>>>>> potential workaround is to ensure that all of the modules get imported
>>>>> during the initialization of the sdk_worker, as this bug only affects
>>>>> imports done by the unpickler.
>>>>
>>>>
>>>> The symptoms do sound similar, so I would try to reproduce your issue
>>>> on 3.7.3 and see if it is gone, or try to reproduce
>>>> https://bugs.python.org/issue34572 in the version of interpreter you
>>>> use. If this doesn't help, you can try to reproduce the race using your
>>>> input.
>>>>
>>>> To get the output of serialized do fn, you could do the following:
>>>> 1. Patch https://github.com/apache/beam/pull/10036.
>>>> 2. Set logging level to DEBUG, see:
>>>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>>>> .
>>>> 3. Check for log output for payload of your transform, it may look like:
>>>>
>>>>     transforms {
>>>>       key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>>>       value {
>>>>         spec {
>>>>           urn: "beam:transform:pardo:v1"
>>>>           payload: "\n\347\006\n\275\006\n
>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>>>> ....
>>>>
>>>> Then you can extract the output of pickled fn:
>>>>
>>>> from apache_beam.utils import proto_utils
>>>> from apache_beam.portability.api import beam_runner_api_pb2
>>>> from apache_beam.internal import pickler
>>>>
>>>> payload = b'\n\347\006\n\275\006\n
>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>>>> pardo_payload = proto_utils.parse_Bytes(x,
>>>> beam_runner_api_pb2.ParDoPayload)
>>>> pickled_fn = pardo_payload.do_fn.spec.payload
>>>>
>>>> pickler.loads(pickle_fn) # Presumably the race happens here when
>>>> unpickling one of your transforms
>>>> (pricingrealtime.aggregation.aggregation_transform).
>>>>
>>>>
>>>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com>
>>>> wrote:
>>>>
>>>>> Thanks Valentyn,
>>>>>
>>>>> Aggregation_transform.py doesn't have any transformation method which
>>>>> extends beam.DoFn. We are using plain python method which we passed in
>>>>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>>>>> please let me the process?
>>>>>
>>>>> I also heard that some people ran into this issue on Python 3.7.1 but
>>>>> the same issue is not present on Python 3.7.3. Can you confirm this?
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
>>>>> valentyn@google.com> wrote:
>>>>>
>>>>>> +user@, bcc: dev@
>>>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing
>>>>>> to this issue, although we saw instances of this bug in exactly opposite
>>>>>> scenarios - when pipeline was defined *in one file*, but not in
>>>>>> multiple files.
>>>>>>
>>>>>> Could you try replacing instances of super() in
>>>>>> aggregation_transform.py  as done in
>>>>>> https://github.com/apache/beam/pull/9513 and see if this issue is
>>>>>> still reproducible?
>>>>>>
>>>>>> If that doesn't work, I would try to get the dump of serialized_fn,
>>>>>> and try to reproduce the issue in isolated environment, such as:
>>>>>>
>>>>>> form apache_beam.internal import pickler
>>>>>> serialized_fn = "..content.."
>>>>>> pickler.loads(serialized_fn)
>>>>>>
>>>>>> then I would try to trim the doFn in the example to a
>>>>>> minimally-reproducible example. It could be another issue with dill
>>>>>> dependency.
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> We have noticed a weird intermittent issue on Python3 but we don't
>>>>>>> run into this issue on python2. Sometimes when we are trying to submit the
>>>>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>>>>> double-checked and we do find the attribute/methods are present in the
>>>>>>> right module and in right place but somehow the pipeline still complains
>>>>>>> about it. In some cases, we refer methods before their definition. We tried
>>>>>>> to reorder the method definition but that didn't help at all.
>>>>>>>
>>>>>>> We don't see the same issue when the entire pipeline is defined in
>>>>>>> one file. Also, note that this doesn't happen all the time when we submit
>>>>>>> the pipeline, so I feel it is some kind of race condition. When we enable
>>>>>>> the worker recycle logic it happens most of the time when sdk worker is
>>>>>>> recycled.
>>>>>>>
>>>>>>> Some more information about the environment:
>>>>>>> Python version: 3
>>>>>>> Beam version: 2.16
>>>>>>> Flink version: 1.8
>>>>>>>
>>>>>>> *Stack trace: *
>>>>>>>
>>>>>>>    - :
>>>>>>>
>>>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>>>>> bundle}
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>>>>> at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>>>>> at
>>>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>>>>> at
>>>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>>>>> ... 7 more
>>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>>>>> 6: Traceback (most recent call last):
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 307, in get
>>>>>>>     processor =
>>>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>>>>> IndexError: pop from empty list
>>>>>>>
>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>
>>>>>>> Traceback (most recent call last):
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>> line 261, in loads
>>>>>>>     return dill.loads(s)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 317, in loads
>>>>>>>     return load(file, ignore)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 305, in load
>>>>>>>     obj = pik.load()
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 474, in find_class
>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>>>>
>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>
>>>>>>> Traceback (most recent call last):
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 165, in _execute
>>>>>>>     response = task()
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 198, in <lambda>
>>>>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 351, in do_instruction
>>>>>>>     request.instruction_id)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 371, in process_bundle
>>>>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 313, in get
>>>>>>>     self.data_channel_factory)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 576, in __init__
>>>>>>>     self.ops =
>>>>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 620, in create_execution_tree
>>>>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 619, in <listcomp>
>>>>>>>     for transform_id in sorted(
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 544, in wrapper
>>>>>>>     result = cache[args] = func(*args)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 603, in get_operation
>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 602, in <dictcomp>
>>>>>>>     for tag, pcoll_id
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 601, in <listcomp>
>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 544, in wrapper
>>>>>>>     result = cache[args] = func(*args)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 603, in get_operation
>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 602, in <dictcomp>
>>>>>>>     for tag, pcoll_id
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 601, in <listcomp>
>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 544, in wrapper
>>>>>>>     result = cache[args] = func(*args)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 606, in get_operation
>>>>>>>     transform_id, transform_consumers)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 865, in create_operation
>>>>>>>     return creator(self, transform_id, transform_proto, payload,
>>>>>>> consumers)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 1108, in create
>>>>>>>     serialized_fn, parameter)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 1146, in _create_pardo_operation
>>>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>> line 265, in loads
>>>>>>>     return dill.loads(s)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 317, in loads
>>>>>>>     return load(file, ignore)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 305, in load
>>>>>>>     obj = pik.load()
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 474, in find_class
>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>>>>
>>>>>>>

Re: Pipeline AttributeError on Python3

Posted by Valentyn Tymofieiev <va...@google.com>.
That would be helpful, thanks a lot! It should be a straightforward patch.
Also, thanks Guenther, for sharing your investigation on
https://bugs.python.org/issue34572, it was very helpful.

On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise <th...@apache.org> wrote:

> Valentyn, thanks a lot for following up on this.
>
> If the change can be cherry picked in isolation, we should be able to
> verify this soon (with 2.16).
>
>
> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev <va...@google.com>
> wrote:
>
>> To close the loop here: To my knowledge this issue affects all Python 3
>> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
>> including users on Python 3.7.3 and newer versions.
>>
>> The issue is addressed on Beam master, and we have a cherry-pick out for
>> Beam 2.17.0.
>>
>> Workaround options for users on 2.16.0 and earlier SDKs:
>>
>> - Patch the SDK you are using with
>> https://github.com/apache/beam/pull/10167.
>> - Temporarily switch to Python 2 until 2.17.0. We have not seen the issue
>> on Python 2, so it may be rare on non-existent on Python 2.
>> - Pass --experiments worker_threads=1 . This option may work only for
>> some, but not all pipelines.
>>
>> See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for
>> details on the issue.
>>
>> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <va...@google.com>
>> wrote:
>>
>>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
>>> this issue and any recommendation for the users that will come out of it.
>>>
>>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <va...@google.com>
>>> wrote:
>>>
>>>>  I think we have heard of this issue from the same source:
>>>>
>>>> This looks exactly like a race condition that we've encountered on
>>>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>>>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>>>>> module before it has been fully imported. See
>>>>> https://bugs.python.org/issue34572 for more information.
>>>>>
>>>>> The traceback shows a Python 3.6 venv so this could be a different
>>>>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>>>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>>>>> potential workaround is to ensure that all of the modules get imported
>>>>> during the initialization of the sdk_worker, as this bug only affects
>>>>> imports done by the unpickler.
>>>>
>>>>
>>>> The symptoms do sound similar, so I would try to reproduce your issue
>>>> on 3.7.3 and see if it is gone, or try to reproduce
>>>> https://bugs.python.org/issue34572 in the version of interpreter you
>>>> use. If this doesn't help, you can try to reproduce the race using your
>>>> input.
>>>>
>>>> To get the output of serialized do fn, you could do the following:
>>>> 1. Patch https://github.com/apache/beam/pull/10036.
>>>> 2. Set logging level to DEBUG, see:
>>>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>>>> .
>>>> 3. Check for log output for payload of your transform, it may look like:
>>>>
>>>>     transforms {
>>>>       key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>>>       value {
>>>>         spec {
>>>>           urn: "beam:transform:pardo:v1"
>>>>           payload: "\n\347\006\n\275\006\n
>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>>>> ....
>>>>
>>>> Then you can extract the output of pickled fn:
>>>>
>>>> from apache_beam.utils import proto_utils
>>>> from apache_beam.portability.api import beam_runner_api_pb2
>>>> from apache_beam.internal import pickler
>>>>
>>>> payload = b'\n\347\006\n\275\006\n
>>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>>>> pardo_payload = proto_utils.parse_Bytes(x,
>>>> beam_runner_api_pb2.ParDoPayload)
>>>> pickled_fn = pardo_payload.do_fn.spec.payload
>>>>
>>>> pickler.loads(pickle_fn) # Presumably the race happens here when
>>>> unpickling one of your transforms
>>>> (pricingrealtime.aggregation.aggregation_transform).
>>>>
>>>>
>>>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com>
>>>> wrote:
>>>>
>>>>> Thanks Valentyn,
>>>>>
>>>>> Aggregation_transform.py doesn't have any transformation method which
>>>>> extends beam.DoFn. We are using plain python method which we passed in
>>>>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>>>>> please let me the process?
>>>>>
>>>>> I also heard that some people ran into this issue on Python 3.7.1 but
>>>>> the same issue is not present on Python 3.7.3. Can you confirm this?
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
>>>>> valentyn@google.com> wrote:
>>>>>
>>>>>> +user@, bcc: dev@
>>>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing
>>>>>> to this issue, although we saw instances of this bug in exactly opposite
>>>>>> scenarios - when pipeline was defined *in one file*, but not in
>>>>>> multiple files.
>>>>>>
>>>>>> Could you try replacing instances of super() in
>>>>>> aggregation_transform.py  as done in
>>>>>> https://github.com/apache/beam/pull/9513 and see if this issue is
>>>>>> still reproducible?
>>>>>>
>>>>>> If that doesn't work, I would try to get the dump of serialized_fn,
>>>>>> and try to reproduce the issue in isolated environment, such as:
>>>>>>
>>>>>> form apache_beam.internal import pickler
>>>>>> serialized_fn = "..content.."
>>>>>> pickler.loads(serialized_fn)
>>>>>>
>>>>>> then I would try to trim the doFn in the example to a
>>>>>> minimally-reproducible example. It could be another issue with dill
>>>>>> dependency.
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> We have noticed a weird intermittent issue on Python3 but we don't
>>>>>>> run into this issue on python2. Sometimes when we are trying to submit the
>>>>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>>>>> double-checked and we do find the attribute/methods are present in the
>>>>>>> right module and in right place but somehow the pipeline still complains
>>>>>>> about it. In some cases, we refer methods before their definition. We tried
>>>>>>> to reorder the method definition but that didn't help at all.
>>>>>>>
>>>>>>> We don't see the same issue when the entire pipeline is defined in
>>>>>>> one file. Also, note that this doesn't happen all the time when we submit
>>>>>>> the pipeline, so I feel it is some kind of race condition. When we enable
>>>>>>> the worker recycle logic it happens most of the time when sdk worker is
>>>>>>> recycled.
>>>>>>>
>>>>>>> Some more information about the environment:
>>>>>>> Python version: 3
>>>>>>> Beam version: 2.16
>>>>>>> Flink version: 1.8
>>>>>>>
>>>>>>> *Stack trace: *
>>>>>>>
>>>>>>>    - :
>>>>>>>
>>>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>>>>> bundle}
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>>>>> at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>>> at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>>>>> at
>>>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>>>>> at
>>>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>>>>> at
>>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>>>>> ... 7 more
>>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>>>>> 6: Traceback (most recent call last):
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 307, in get
>>>>>>>     processor =
>>>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>>>>> IndexError: pop from empty list
>>>>>>>
>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>
>>>>>>> Traceback (most recent call last):
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>> line 261, in loads
>>>>>>>     return dill.loads(s)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 317, in loads
>>>>>>>     return load(file, ignore)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 305, in load
>>>>>>>     obj = pik.load()
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 474, in find_class
>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>>>>
>>>>>>> During handling of the above exception, another exception occurred:
>>>>>>>
>>>>>>> Traceback (most recent call last):
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 165, in _execute
>>>>>>>     response = task()
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 198, in <lambda>
>>>>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 351, in do_instruction
>>>>>>>     request.instruction_id)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 371, in process_bundle
>>>>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>>> line 313, in get
>>>>>>>     self.data_channel_factory)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 576, in __init__
>>>>>>>     self.ops =
>>>>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 620, in create_execution_tree
>>>>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 619, in <listcomp>
>>>>>>>     for transform_id in sorted(
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 544, in wrapper
>>>>>>>     result = cache[args] = func(*args)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 603, in get_operation
>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 602, in <dictcomp>
>>>>>>>     for tag, pcoll_id
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 601, in <listcomp>
>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 544, in wrapper
>>>>>>>     result = cache[args] = func(*args)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 603, in get_operation
>>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 602, in <dictcomp>
>>>>>>>     for tag, pcoll_id
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 601, in <listcomp>
>>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 544, in wrapper
>>>>>>>     result = cache[args] = func(*args)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 606, in get_operation
>>>>>>>     transform_id, transform_consumers)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 865, in create_operation
>>>>>>>     return creator(self, transform_id, transform_proto, payload,
>>>>>>> consumers)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 1108, in create
>>>>>>>     serialized_fn, parameter)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>>> line 1146, in _create_pardo_operation
>>>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>>> line 265, in loads
>>>>>>>     return dill.loads(s)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 317, in loads
>>>>>>>     return load(file, ignore)
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 305, in load
>>>>>>>     obj = pik.load()
>>>>>>>   File
>>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>>> line 474, in find_class
>>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>>>>
>>>>>>>

Re: Pipeline AttributeError on Python3

Posted by Thomas Weise <th...@apache.org>.
Valentyn, thanks a lot for following up on this.

If the change can be cherry picked in isolation, we should be able to
verify this soon (with 2.16).


On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev <va...@google.com>
wrote:

> To close the loop here: To my knowledge this issue affects all Python 3
> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
> including users on Python 3.7.3 and newer versions.
>
> The issue is addressed on Beam master, and we have a cherry-pick out for
> Beam 2.17.0.
>
> Workaround options for users on 2.16.0 and earlier SDKs:
>
> - Patch the SDK you are using with
> https://github.com/apache/beam/pull/10167.
> - Temporarily switch to Python 2 until 2.17.0. We have not seen the issue
> on Python 2, so it may be rare on non-existent on Python 2.
> - Pass --experiments worker_threads=1 . This option may work only for
> some, but not all pipelines.
>
> See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for
> details on the issue.
>
> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <va...@google.com>
> wrote:
>
>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
>> this issue and any recommendation for the users that will come out of it.
>>
>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <va...@google.com>
>> wrote:
>>
>>>  I think we have heard of this issue from the same source:
>>>
>>> This looks exactly like a race condition that we've encountered on
>>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>>>> module before it has been fully imported. See
>>>> https://bugs.python.org/issue34572 for more information.
>>>>
>>>> The traceback shows a Python 3.6 venv so this could be a different
>>>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>>>> potential workaround is to ensure that all of the modules get imported
>>>> during the initialization of the sdk_worker, as this bug only affects
>>>> imports done by the unpickler.
>>>
>>>
>>> The symptoms do sound similar, so I would try to reproduce your issue on
>>> 3.7.3 and see if it is gone, or try to reproduce
>>> https://bugs.python.org/issue34572 in the version of interpreter you
>>> use. If this doesn't help, you can try to reproduce the race using your
>>> input.
>>>
>>> To get the output of serialized do fn, you could do the following:
>>> 1. Patch https://github.com/apache/beam/pull/10036.
>>> 2. Set logging level to DEBUG, see:
>>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>>> .
>>> 3. Check for log output for payload of your transform, it may look like:
>>>
>>>     transforms {
>>>       key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>>       value {
>>>         spec {
>>>           urn: "beam:transform:pardo:v1"
>>>           payload: "\n\347\006\n\275\006\n
>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>>> ....
>>>
>>> Then you can extract the output of pickled fn:
>>>
>>> from apache_beam.utils import proto_utils
>>> from apache_beam.portability.api import beam_runner_api_pb2
>>> from apache_beam.internal import pickler
>>>
>>> payload = b'\n\347\006\n\275\006\n
>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>>> pardo_payload = proto_utils.parse_Bytes(x,
>>> beam_runner_api_pb2.ParDoPayload)
>>> pickled_fn = pardo_payload.do_fn.spec.payload
>>>
>>> pickler.loads(pickle_fn) # Presumably the race happens here when
>>> unpickling one of your transforms
>>> (pricingrealtime.aggregation.aggregation_transform).
>>>
>>>
>>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com>
>>> wrote:
>>>
>>>> Thanks Valentyn,
>>>>
>>>> Aggregation_transform.py doesn't have any transformation method which
>>>> extends beam.DoFn. We are using plain python method which we passed in
>>>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>>>> please let me the process?
>>>>
>>>> I also heard that some people ran into this issue on Python 3.7.1 but
>>>> the same issue is not present on Python 3.7.3. Can you confirm this?
>>>>
>>>>
>>>>
>>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
>>>> valentyn@google.com> wrote:
>>>>
>>>>> +user@, bcc: dev@
>>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing
>>>>> to this issue, although we saw instances of this bug in exactly opposite
>>>>> scenarios - when pipeline was defined *in one file*, but not in
>>>>> multiple files.
>>>>>
>>>>> Could you try replacing instances of super() in
>>>>> aggregation_transform.py  as done in
>>>>> https://github.com/apache/beam/pull/9513 and see if this issue is
>>>>> still reproducible?
>>>>>
>>>>> If that doesn't work, I would try to get the dump of serialized_fn,
>>>>> and try to reproduce the issue in isolated environment, such as:
>>>>>
>>>>> form apache_beam.internal import pickler
>>>>> serialized_fn = "..content.."
>>>>> pickler.loads(serialized_fn)
>>>>>
>>>>> then I would try to trim the doFn in the example to a
>>>>> minimally-reproducible example. It could be another issue with dill
>>>>> dependency.
>>>>>
>>>>>
>>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> We have noticed a weird intermittent issue on Python3 but we don't
>>>>>> run into this issue on python2. Sometimes when we are trying to submit the
>>>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>>>> double-checked and we do find the attribute/methods are present in the
>>>>>> right module and in right place but somehow the pipeline still complains
>>>>>> about it. In some cases, we refer methods before their definition. We tried
>>>>>> to reorder the method definition but that didn't help at all.
>>>>>>
>>>>>> We don't see the same issue when the entire pipeline is defined in
>>>>>> one file. Also, note that this doesn't happen all the time when we submit
>>>>>> the pipeline, so I feel it is some kind of race condition. When we enable
>>>>>> the worker recycle logic it happens most of the time when sdk worker is
>>>>>> recycled.
>>>>>>
>>>>>> Some more information about the environment:
>>>>>> Python version: 3
>>>>>> Beam version: 2.16
>>>>>> Flink version: 1.8
>>>>>>
>>>>>> *Stack trace: *
>>>>>>
>>>>>>    - :
>>>>>>
>>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>>>> bundle}
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>>>> at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>> at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>> at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>>>>> at
>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>>>> at
>>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>>>> at
>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>>>> at
>>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>>>> at
>>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>>>> at
>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>>>> at
>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>>>> at
>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>>>> ... 7 more
>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>>>> 6: Traceback (most recent call last):
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 307, in get
>>>>>>     processor =
>>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>>>> IndexError: pop from empty list
>>>>>>
>>>>>> During handling of the above exception, another exception occurred:
>>>>>>
>>>>>> Traceback (most recent call last):
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>> line 261, in loads
>>>>>>     return dill.loads(s)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 317, in loads
>>>>>>     return load(file, ignore)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 305, in load
>>>>>>     obj = pik.load()
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 474, in find_class
>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>>>
>>>>>> During handling of the above exception, another exception occurred:
>>>>>>
>>>>>> Traceback (most recent call last):
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 165, in _execute
>>>>>>     response = task()
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 198, in <lambda>
>>>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 351, in do_instruction
>>>>>>     request.instruction_id)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 371, in process_bundle
>>>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 313, in get
>>>>>>     self.data_channel_factory)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 576, in __init__
>>>>>>     self.ops =
>>>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 620, in create_execution_tree
>>>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 619, in <listcomp>
>>>>>>     for transform_id in sorted(
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 544, in wrapper
>>>>>>     result = cache[args] = func(*args)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 603, in get_operation
>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 602, in <dictcomp>
>>>>>>     for tag, pcoll_id
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 601, in <listcomp>
>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 544, in wrapper
>>>>>>     result = cache[args] = func(*args)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 603, in get_operation
>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 602, in <dictcomp>
>>>>>>     for tag, pcoll_id
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 601, in <listcomp>
>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 544, in wrapper
>>>>>>     result = cache[args] = func(*args)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 606, in get_operation
>>>>>>     transform_id, transform_consumers)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 865, in create_operation
>>>>>>     return creator(self, transform_id, transform_proto, payload,
>>>>>> consumers)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 1108, in create
>>>>>>     serialized_fn, parameter)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 1146, in _create_pardo_operation
>>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>> line 265, in loads
>>>>>>     return dill.loads(s)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 317, in loads
>>>>>>     return load(file, ignore)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 305, in load
>>>>>>     obj = pik.load()
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 474, in find_class
>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>>>
>>>>>>

Re: Pipeline AttributeError on Python3

Posted by Thomas Weise <th...@apache.org>.
Valentyn, thanks a lot for following up on this.

If the change can be cherry picked in isolation, we should be able to
verify this soon (with 2.16).


On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev <va...@google.com>
wrote:

> To close the loop here: To my knowledge this issue affects all Python 3
> users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
> including users on Python 3.7.3 and newer versions.
>
> The issue is addressed on Beam master, and we have a cherry-pick out for
> Beam 2.17.0.
>
> Workaround options for users on 2.16.0 and earlier SDKs:
>
> - Patch the SDK you are using with
> https://github.com/apache/beam/pull/10167.
> - Temporarily switch to Python 2 until 2.17.0. We have not seen the issue
> on Python 2, so it may be rare on non-existent on Python 2.
> - Pass --experiments worker_threads=1 . This option may work only for
> some, but not all pipelines.
>
> See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for
> details on the issue.
>
> On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <va...@google.com>
> wrote:
>
>> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
>> this issue and any recommendation for the users that will come out of it.
>>
>> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <va...@google.com>
>> wrote:
>>
>>>  I think we have heard of this issue from the same source:
>>>
>>> This looks exactly like a race condition that we've encountered on
>>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>>>> module before it has been fully imported. See
>>>> https://bugs.python.org/issue34572 for more information.
>>>>
>>>> The traceback shows a Python 3.6 venv so this could be a different
>>>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>>>> potential workaround is to ensure that all of the modules get imported
>>>> during the initialization of the sdk_worker, as this bug only affects
>>>> imports done by the unpickler.
>>>
>>>
>>> The symptoms do sound similar, so I would try to reproduce your issue on
>>> 3.7.3 and see if it is gone, or try to reproduce
>>> https://bugs.python.org/issue34572 in the version of interpreter you
>>> use. If this doesn't help, you can try to reproduce the race using your
>>> input.
>>>
>>> To get the output of serialized do fn, you could do the following:
>>> 1. Patch https://github.com/apache/beam/pull/10036.
>>> 2. Set logging level to DEBUG, see:
>>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>>> .
>>> 3. Check for log output for payload of your transform, it may look like:
>>>
>>>     transforms {
>>>       key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>>       value {
>>>         spec {
>>>           urn: "beam:transform:pardo:v1"
>>>           payload: "\n\347\006\n\275\006\n
>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>>> ....
>>>
>>> Then you can extract the output of pickled fn:
>>>
>>> from apache_beam.utils import proto_utils
>>> from apache_beam.portability.api import beam_runner_api_pb2
>>> from apache_beam.internal import pickler
>>>
>>> payload = b'\n\347\006\n\275\006\n
>>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>>> pardo_payload = proto_utils.parse_Bytes(x,
>>> beam_runner_api_pb2.ParDoPayload)
>>> pickled_fn = pardo_payload.do_fn.spec.payload
>>>
>>> pickler.loads(pickle_fn) # Presumably the race happens here when
>>> unpickling one of your transforms
>>> (pricingrealtime.aggregation.aggregation_transform).
>>>
>>>
>>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com>
>>> wrote:
>>>
>>>> Thanks Valentyn,
>>>>
>>>> Aggregation_transform.py doesn't have any transformation method which
>>>> extends beam.DoFn. We are using plain python method which we passed in
>>>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>>>> please let me the process?
>>>>
>>>> I also heard that some people ran into this issue on Python 3.7.1 but
>>>> the same issue is not present on Python 3.7.3. Can you confirm this?
>>>>
>>>>
>>>>
>>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <
>>>> valentyn@google.com> wrote:
>>>>
>>>>> +user@, bcc: dev@
>>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing
>>>>> to this issue, although we saw instances of this bug in exactly opposite
>>>>> scenarios - when pipeline was defined *in one file*, but not in
>>>>> multiple files.
>>>>>
>>>>> Could you try replacing instances of super() in
>>>>> aggregation_transform.py  as done in
>>>>> https://github.com/apache/beam/pull/9513 and see if this issue is
>>>>> still reproducible?
>>>>>
>>>>> If that doesn't work, I would try to get the dump of serialized_fn,
>>>>> and try to reproduce the issue in isolated environment, such as:
>>>>>
>>>>> form apache_beam.internal import pickler
>>>>> serialized_fn = "..content.."
>>>>> pickler.loads(serialized_fn)
>>>>>
>>>>> then I would try to trim the doFn in the example to a
>>>>> minimally-reproducible example. It could be another issue with dill
>>>>> dependency.
>>>>>
>>>>>
>>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> We have noticed a weird intermittent issue on Python3 but we don't
>>>>>> run into this issue on python2. Sometimes when we are trying to submit the
>>>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>>>> double-checked and we do find the attribute/methods are present in the
>>>>>> right module and in right place but somehow the pipeline still complains
>>>>>> about it. In some cases, we refer methods before their definition. We tried
>>>>>> to reorder the method definition but that didn't help at all.
>>>>>>
>>>>>> We don't see the same issue when the entire pipeline is defined in
>>>>>> one file. Also, note that this doesn't happen all the time when we submit
>>>>>> the pipeline, so I feel it is some kind of race condition. When we enable
>>>>>> the worker recycle logic it happens most of the time when sdk worker is
>>>>>> recycled.
>>>>>>
>>>>>> Some more information about the environment:
>>>>>> Python version: 3
>>>>>> Beam version: 2.16
>>>>>> Flink version: 1.8
>>>>>>
>>>>>> *Stack trace: *
>>>>>>
>>>>>>    - :
>>>>>>
>>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>>>> bundle}
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>>>> at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>> at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>> at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>>>>> at
>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>>>> at
>>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>>>> at
>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>>>> at
>>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>>>> at
>>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>>>> at
>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>>>> at
>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>>>> at
>>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>>>> ... 7 more
>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>>>> 6: Traceback (most recent call last):
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 307, in get
>>>>>>     processor =
>>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>>>> IndexError: pop from empty list
>>>>>>
>>>>>> During handling of the above exception, another exception occurred:
>>>>>>
>>>>>> Traceback (most recent call last):
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>> line 261, in loads
>>>>>>     return dill.loads(s)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 317, in loads
>>>>>>     return load(file, ignore)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 305, in load
>>>>>>     obj = pik.load()
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 474, in find_class
>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>>>
>>>>>> During handling of the above exception, another exception occurred:
>>>>>>
>>>>>> Traceback (most recent call last):
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 165, in _execute
>>>>>>     response = task()
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 198, in <lambda>
>>>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 351, in do_instruction
>>>>>>     request.instruction_id)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 371, in process_bundle
>>>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>>> line 313, in get
>>>>>>     self.data_channel_factory)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 576, in __init__
>>>>>>     self.ops =
>>>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 620, in create_execution_tree
>>>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 619, in <listcomp>
>>>>>>     for transform_id in sorted(
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 544, in wrapper
>>>>>>     result = cache[args] = func(*args)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 603, in get_operation
>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 602, in <dictcomp>
>>>>>>     for tag, pcoll_id
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 601, in <listcomp>
>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 544, in wrapper
>>>>>>     result = cache[args] = func(*args)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 603, in get_operation
>>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 602, in <dictcomp>
>>>>>>     for tag, pcoll_id
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 601, in <listcomp>
>>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 544, in wrapper
>>>>>>     result = cache[args] = func(*args)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 606, in get_operation
>>>>>>     transform_id, transform_consumers)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 865, in create_operation
>>>>>>     return creator(self, transform_id, transform_proto, payload,
>>>>>> consumers)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 1108, in create
>>>>>>     serialized_fn, parameter)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>>> line 1146, in _create_pardo_operation
>>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>>> line 265, in loads
>>>>>>     return dill.loads(s)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 317, in loads
>>>>>>     return load(file, ignore)
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 305, in load
>>>>>>     obj = pik.load()
>>>>>>   File
>>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>>> line 474, in find_class
>>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>>>
>>>>>>

Re: Pipeline AttributeError on Python3

Posted by Valentyn Tymofieiev <va...@google.com>.
To close the loop here: To my knowledge this issue affects all Python 3
users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
including users on Python 3.7.3 and newer versions.

The issue is addressed on Beam master, and we have a cherry-pick out for
Beam 2.17.0.

Workaround options for users on 2.16.0 and earlier SDKs:

- Patch the SDK you are using with https://github.com/apache/beam/pull/10167.

- Temporarily switch to Python 2 until 2.17.0. We have not seen the issue
on Python 2, so it may be rare on non-existent on Python 2.
- Pass --experiments worker_threads=1 . This option may work only for some,
but not all pipelines.

See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for details
on the issue.

On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <va...@google.com>
wrote:

> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
> this issue and any recommendation for the users that will come out of it.
>
> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <va...@google.com>
> wrote:
>
>>  I think we have heard of this issue from the same source:
>>
>> This looks exactly like a race condition that we've encountered on Python
>>> 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>>> module before it has been fully imported. See
>>> https://bugs.python.org/issue34572 for more information.
>>>
>>> The traceback shows a Python 3.6 venv so this could be a different issue
>>> (the unpickle bug was introduced in version 3.7). If it's the same bug then
>>> upgrading to Python 3.7.3 or higher should fix that issue. One potential
>>> workaround is to ensure that all of the modules get imported during the
>>> initialization of the sdk_worker, as this bug only affects imports done by
>>> the unpickler.
>>
>>
>> The symptoms do sound similar, so I would try to reproduce your issue on
>> 3.7.3 and see if it is gone, or try to reproduce
>> https://bugs.python.org/issue34572 in the version of interpreter you
>> use. If this doesn't help, you can try to reproduce the race using your
>> input.
>>
>> To get the output of serialized do fn, you could do the following:
>> 1. Patch https://github.com/apache/beam/pull/10036.
>> 2. Set logging level to DEBUG, see:
>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>> .
>> 3. Check for log output for payload of your transform, it may look like:
>>
>>     transforms {
>>       key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>       value {
>>         spec {
>>           urn: "beam:transform:pardo:v1"
>>           payload: "\n\347\006\n\275\006\n
>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>> ....
>>
>> Then you can extract the output of pickled fn:
>>
>> from apache_beam.utils import proto_utils
>> from apache_beam.portability.api import beam_runner_api_pb2
>> from apache_beam.internal import pickler
>>
>> payload = b'\n\347\006\n\275\006\n
>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>> pardo_payload = proto_utils.parse_Bytes(x,
>> beam_runner_api_pb2.ParDoPayload)
>> pickled_fn = pardo_payload.do_fn.spec.payload
>>
>> pickler.loads(pickle_fn) # Presumably the race happens here when
>> unpickling one of your transforms
>> (pricingrealtime.aggregation.aggregation_transform).
>>
>>
>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com>
>> wrote:
>>
>>> Thanks Valentyn,
>>>
>>> Aggregation_transform.py doesn't have any transformation method which
>>> extends beam.DoFn. We are using plain python method which we passed in
>>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>>> please let me the process?
>>>
>>> I also heard that some people ran into this issue on Python 3.7.1 but
>>> the same issue is not present on Python 3.7.3. Can you confirm this?
>>>
>>>
>>>
>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <va...@google.com>
>>> wrote:
>>>
>>>> +user@, bcc: dev@
>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>>>> this issue, although we saw instances of this bug in exactly opposite
>>>> scenarios - when pipeline was defined *in one file*, but not in
>>>> multiple files.
>>>>
>>>> Could you try replacing instances of super() in
>>>> aggregation_transform.py  as done in
>>>> https://github.com/apache/beam/pull/9513 and see if this issue is
>>>> still reproducible?
>>>>
>>>> If that doesn't work, I would try to get the dump of serialized_fn, and
>>>> try to reproduce the issue in isolated environment, such as:
>>>>
>>>> form apache_beam.internal import pickler
>>>> serialized_fn = "..content.."
>>>> pickler.loads(serialized_fn)
>>>>
>>>> then I would try to trim the doFn in the example to a
>>>> minimally-reproducible example. It could be another issue with dill
>>>> dependency.
>>>>
>>>>
>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> We have noticed a weird intermittent issue on Python3 but we don't run
>>>>> into this issue on python2. Sometimes when we are trying to submit the
>>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>>> double-checked and we do find the attribute/methods are present in the
>>>>> right module and in right place but somehow the pipeline still complains
>>>>> about it. In some cases, we refer methods before their definition. We tried
>>>>> to reorder the method definition but that didn't help at all.
>>>>>
>>>>> We don't see the same issue when the entire pipeline is defined in one
>>>>> file. Also, note that this doesn't happen all the time when we submit the
>>>>> pipeline, so I feel it is some kind of race condition. When we enable the
>>>>> worker recycle logic it happens most of the time when sdk worker is
>>>>> recycled.
>>>>>
>>>>> Some more information about the environment:
>>>>> Python version: 3
>>>>> Beam version: 2.16
>>>>> Flink version: 1.8
>>>>>
>>>>> *Stack trace: *
>>>>>
>>>>>    - :
>>>>>
>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>>> bundle}
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>>> at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>>>> at
>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>>> at
>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>>> at
>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>>> at
>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>>> at
>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>>> at
>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>>> at
>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>>> at
>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>>> ... 7 more
>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>>> 6: Traceback (most recent call last):
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 307, in get
>>>>>     processor =
>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>>> IndexError: pop from empty list
>>>>>
>>>>> During handling of the above exception, another exception occurred:
>>>>>
>>>>> Traceback (most recent call last):
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>> line 261, in loads
>>>>>     return dill.loads(s)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 317, in loads
>>>>>     return load(file, ignore)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 305, in load
>>>>>     obj = pik.load()
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 474, in find_class
>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>>
>>>>> During handling of the above exception, another exception occurred:
>>>>>
>>>>> Traceback (most recent call last):
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 165, in _execute
>>>>>     response = task()
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 198, in <lambda>
>>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 351, in do_instruction
>>>>>     request.instruction_id)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 371, in process_bundle
>>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 313, in get
>>>>>     self.data_channel_factory)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 576, in __init__
>>>>>     self.ops =
>>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 620, in create_execution_tree
>>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 619, in <listcomp>
>>>>>     for transform_id in sorted(
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 544, in wrapper
>>>>>     result = cache[args] = func(*args)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 603, in get_operation
>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 602, in <dictcomp>
>>>>>     for tag, pcoll_id
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 601, in <listcomp>
>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 544, in wrapper
>>>>>     result = cache[args] = func(*args)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 603, in get_operation
>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 602, in <dictcomp>
>>>>>     for tag, pcoll_id
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 601, in <listcomp>
>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 544, in wrapper
>>>>>     result = cache[args] = func(*args)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 606, in get_operation
>>>>>     transform_id, transform_consumers)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 865, in create_operation
>>>>>     return creator(self, transform_id, transform_proto, payload,
>>>>> consumers)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 1108, in create
>>>>>     serialized_fn, parameter)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 1146, in _create_pardo_operation
>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>> line 265, in loads
>>>>>     return dill.loads(s)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 317, in loads
>>>>>     return load(file, ignore)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 305, in load
>>>>>     obj = pik.load()
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 474, in find_class
>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>>
>>>>>

Re: Pipeline AttributeError on Python3

Posted by Valentyn Tymofieiev <va...@google.com>.
To close the loop here: To my knowledge this issue affects all Python 3
users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
including users on Python 3.7.3 and newer versions.

The issue is addressed on Beam master, and we have a cherry-pick out for
Beam 2.17.0.

Workaround options for users on 2.16.0 and earlier SDKs:

- Patch the SDK you are using with https://github.com/apache/beam/pull/10167.

- Temporarily switch to Python 2 until 2.17.0. We have not seen the issue
on Python 2, so it may be rare on non-existent on Python 2.
- Pass --experiments worker_threads=1 . This option may work only for some,
but not all pipelines.

See BEAM-8651 <https://issues.apache.org/jira/browse/BEAM-8651> for details
on the issue.

On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <va...@google.com>
wrote:

> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track
> this issue and any recommendation for the users that will come out of it.
>
> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <va...@google.com>
> wrote:
>
>>  I think we have heard of this issue from the same source:
>>
>> This looks exactly like a race condition that we've encountered on Python
>>> 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>>> module before it has been fully imported. See
>>> https://bugs.python.org/issue34572 for more information.
>>>
>>> The traceback shows a Python 3.6 venv so this could be a different issue
>>> (the unpickle bug was introduced in version 3.7). If it's the same bug then
>>> upgrading to Python 3.7.3 or higher should fix that issue. One potential
>>> workaround is to ensure that all of the modules get imported during the
>>> initialization of the sdk_worker, as this bug only affects imports done by
>>> the unpickler.
>>
>>
>> The symptoms do sound similar, so I would try to reproduce your issue on
>> 3.7.3 and see if it is gone, or try to reproduce
>> https://bugs.python.org/issue34572 in the version of interpreter you
>> use. If this doesn't help, you can try to reproduce the race using your
>> input.
>>
>> To get the output of serialized do fn, you could do the following:
>> 1. Patch https://github.com/apache/beam/pull/10036.
>> 2. Set logging level to DEBUG, see:
>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>> .
>> 3. Check for log output for payload of your transform, it may look like:
>>
>>     transforms {
>>       key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>       value {
>>         spec {
>>           urn: "beam:transform:pardo:v1"
>>           payload: "\n\347\006\n\275\006\n
>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>> ....
>>
>> Then you can extract the output of pickled fn:
>>
>> from apache_beam.utils import proto_utils
>> from apache_beam.portability.api import beam_runner_api_pb2
>> from apache_beam.internal import pickler
>>
>> payload = b'\n\347\006\n\275\006\n
>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>> pardo_payload = proto_utils.parse_Bytes(x,
>> beam_runner_api_pb2.ParDoPayload)
>> pickled_fn = pardo_payload.do_fn.spec.payload
>>
>> pickler.loads(pickle_fn) # Presumably the race happens here when
>> unpickling one of your transforms
>> (pricingrealtime.aggregation.aggregation_transform).
>>
>>
>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com>
>> wrote:
>>
>>> Thanks Valentyn,
>>>
>>> Aggregation_transform.py doesn't have any transformation method which
>>> extends beam.DoFn. We are using plain python method which we passed in
>>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>>> please let me the process?
>>>
>>> I also heard that some people ran into this issue on Python 3.7.1 but
>>> the same issue is not present on Python 3.7.3. Can you confirm this?
>>>
>>>
>>>
>>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <va...@google.com>
>>> wrote:
>>>
>>>> +user@, bcc: dev@
>>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>>>> this issue, although we saw instances of this bug in exactly opposite
>>>> scenarios - when pipeline was defined *in one file*, but not in
>>>> multiple files.
>>>>
>>>> Could you try replacing instances of super() in
>>>> aggregation_transform.py  as done in
>>>> https://github.com/apache/beam/pull/9513 and see if this issue is
>>>> still reproducible?
>>>>
>>>> If that doesn't work, I would try to get the dump of serialized_fn, and
>>>> try to reproduce the issue in isolated environment, such as:
>>>>
>>>> form apache_beam.internal import pickler
>>>> serialized_fn = "..content.."
>>>> pickler.loads(serialized_fn)
>>>>
>>>> then I would try to trim the doFn in the example to a
>>>> minimally-reproducible example. It could be another issue with dill
>>>> dependency.
>>>>
>>>>
>>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> We have noticed a weird intermittent issue on Python3 but we don't run
>>>>> into this issue on python2. Sometimes when we are trying to submit the
>>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>>> double-checked and we do find the attribute/methods are present in the
>>>>> right module and in right place but somehow the pipeline still complains
>>>>> about it. In some cases, we refer methods before their definition. We tried
>>>>> to reorder the method definition but that didn't help at all.
>>>>>
>>>>> We don't see the same issue when the entire pipeline is defined in one
>>>>> file. Also, note that this doesn't happen all the time when we submit the
>>>>> pipeline, so I feel it is some kind of race condition. When we enable the
>>>>> worker recycle logic it happens most of the time when sdk worker is
>>>>> recycled.
>>>>>
>>>>> Some more information about the environment:
>>>>> Python version: 3
>>>>> Beam version: 2.16
>>>>> Flink version: 1.8
>>>>>
>>>>> *Stack trace: *
>>>>>
>>>>>    - :
>>>>>
>>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>>> bundle}
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>>> at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>> at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>>>> at
>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>>> at
>>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>>> at
>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>>> at
>>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>>> at
>>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>>> at
>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>>> at
>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>>> at
>>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>>> ... 7 more
>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>>> 6: Traceback (most recent call last):
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 307, in get
>>>>>     processor =
>>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>>> IndexError: pop from empty list
>>>>>
>>>>> During handling of the above exception, another exception occurred:
>>>>>
>>>>> Traceback (most recent call last):
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>> line 261, in loads
>>>>>     return dill.loads(s)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 317, in loads
>>>>>     return load(file, ignore)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 305, in load
>>>>>     obj = pik.load()
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 474, in find_class
>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>>
>>>>> During handling of the above exception, another exception occurred:
>>>>>
>>>>> Traceback (most recent call last):
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 165, in _execute
>>>>>     response = task()
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 198, in <lambda>
>>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 351, in do_instruction
>>>>>     request.instruction_id)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 371, in process_bundle
>>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>> line 313, in get
>>>>>     self.data_channel_factory)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 576, in __init__
>>>>>     self.ops =
>>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 620, in create_execution_tree
>>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 619, in <listcomp>
>>>>>     for transform_id in sorted(
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 544, in wrapper
>>>>>     result = cache[args] = func(*args)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 603, in get_operation
>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 602, in <dictcomp>
>>>>>     for tag, pcoll_id
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 601, in <listcomp>
>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 544, in wrapper
>>>>>     result = cache[args] = func(*args)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 603, in get_operation
>>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 602, in <dictcomp>
>>>>>     for tag, pcoll_id
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 601, in <listcomp>
>>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 544, in wrapper
>>>>>     result = cache[args] = func(*args)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 606, in get_operation
>>>>>     transform_id, transform_consumers)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 865, in create_operation
>>>>>     return creator(self, transform_id, transform_proto, payload,
>>>>> consumers)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 1108, in create
>>>>>     serialized_fn, parameter)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>> line 1146, in _create_pardo_operation
>>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>>> line 265, in loads
>>>>>     return dill.loads(s)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 317, in loads
>>>>>     return load(file, ignore)
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 305, in load
>>>>>     obj = pik.load()
>>>>>   File
>>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>>> line 474, in find_class
>>>>>     return StockUnpickler.find_class(self, module, name)
>>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>>
>>>>>

Re: Pipeline AttributeError on Python3

Posted by Valentyn Tymofieiev <va...@google.com>.
I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track this
issue and any recommendation for the users that will come out of it.

On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <va...@google.com>
wrote:

>  I think we have heard of this issue from the same source:
>
> This looks exactly like a race condition that we've encountered on Python
>> 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>> module before it has been fully imported. See
>> https://bugs.python.org/issue34572 for more information.
>>
>> The traceback shows a Python 3.6 venv so this could be a different issue
>> (the unpickle bug was introduced in version 3.7). If it's the same bug then
>> upgrading to Python 3.7.3 or higher should fix that issue. One potential
>> workaround is to ensure that all of the modules get imported during the
>> initialization of the sdk_worker, as this bug only affects imports done by
>> the unpickler.
>
>
> The symptoms do sound similar, so I would try to reproduce your issue on
> 3.7.3 and see if it is gone, or try to reproduce
> https://bugs.python.org/issue34572 in the version of interpreter you use.
> If this doesn't help, you can try to reproduce the race using your input.
>
> To get the output of serialized do fn, you could do the following:
> 1. Patch https://github.com/apache/beam/pull/10036.
> 2. Set logging level to DEBUG, see:
> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
> .
> 3. Check for log output for payload of your transform, it may look like:
>
>     transforms {
>       key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>       value {
>         spec {
>           urn: "beam:transform:pardo:v1"
>           payload: "\n\347\006\n\275\006\n
> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
> ....
>
> Then you can extract the output of pickled fn:
>
> from apache_beam.utils import proto_utils
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.internal import pickler
>
> payload = b'\n\347\006\n\275\006\n
> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
> pardo_payload = proto_utils.parse_Bytes(x,
> beam_runner_api_pb2.ParDoPayload)
> pickled_fn = pardo_payload.do_fn.spec.payload
>
> pickler.loads(pickle_fn) # Presumably the race happens here when
> unpickling one of your transforms
> (pricingrealtime.aggregation.aggregation_transform).
>
>
> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com> wrote:
>
>> Thanks Valentyn,
>>
>> Aggregation_transform.py doesn't have any transformation method which
>> extends beam.DoFn. We are using plain python method which we passed in
>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>> please let me the process?
>>
>> I also heard that some people ran into this issue on Python 3.7.1 but the
>> same issue is not present on Python 3.7.3. Can you confirm this?
>>
>>
>>
>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <va...@google.com>
>> wrote:
>>
>>> +user@, bcc: dev@
>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>>> this issue, although we saw instances of this bug in exactly opposite
>>> scenarios - when pipeline was defined *in one file*, but not in
>>> multiple files.
>>>
>>> Could you try replacing instances of super() in
>>> aggregation_transform.py  as done in
>>> https://github.com/apache/beam/pull/9513 and see if this issue is still
>>> reproducible?
>>>
>>> If that doesn't work, I would try to get the dump of serialized_fn, and
>>> try to reproduce the issue in isolated environment, such as:
>>>
>>> form apache_beam.internal import pickler
>>> serialized_fn = "..content.."
>>> pickler.loads(serialized_fn)
>>>
>>> then I would try to trim the doFn in the example to a
>>> minimally-reproducible example. It could be another issue with dill
>>> dependency.
>>>
>>>
>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> We have noticed a weird intermittent issue on Python3 but we don't run
>>>> into this issue on python2. Sometimes when we are trying to submit the
>>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>>> double-checked and we do find the attribute/methods are present in the
>>>> right module and in right place but somehow the pipeline still complains
>>>> about it. In some cases, we refer methods before their definition. We tried
>>>> to reorder the method definition but that didn't help at all.
>>>>
>>>> We don't see the same issue when the entire pipeline is defined in one
>>>> file. Also, note that this doesn't happen all the time when we submit the
>>>> pipeline, so I feel it is some kind of race condition. When we enable the
>>>> worker recycle logic it happens most of the time when sdk worker is
>>>> recycled.
>>>>
>>>> Some more information about the environment:
>>>> Python version: 3
>>>> Beam version: 2.16
>>>> Flink version: 1.8
>>>>
>>>> *Stack trace: *
>>>>
>>>>    - :
>>>>
>>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>>> bundle}
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>> at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>>> at
>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>>> at
>>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>>> at
>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>>> at
>>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>>> at
>>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>>> at
>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>>> at
>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>>> at
>>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>>> ... 7 more
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>>> 6: Traceback (most recent call last):
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 307, in get
>>>>     processor =
>>>> self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>> IndexError: pop from empty list
>>>>
>>>> During handling of the above exception, another exception occurred:
>>>>
>>>> Traceback (most recent call last):
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>> line 261, in loads
>>>>     return dill.loads(s)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 317, in loads
>>>>     return load(file, ignore)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 305, in load
>>>>     obj = pik.load()
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 474, in find_class
>>>>     return StockUnpickler.find_class(self, module, name)
>>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>>
>>>> During handling of the above exception, another exception occurred:
>>>>
>>>> Traceback (most recent call last):
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 165, in _execute
>>>>     response = task()
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 198, in <lambda>
>>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 351, in do_instruction
>>>>     request.instruction_id)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 371, in process_bundle
>>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>> line 313, in get
>>>>     self.data_channel_factory)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 576, in __init__
>>>>     self.ops =
>>>> self.create_execution_tree(self.process_bundle_descriptor)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 620, in create_execution_tree
>>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 619, in <listcomp>
>>>>     for transform_id in sorted(
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 544, in wrapper
>>>>     result = cache[args] = func(*args)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 603, in get_operation
>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 602, in <dictcomp>
>>>>     for tag, pcoll_id
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 601, in <listcomp>
>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 544, in wrapper
>>>>     result = cache[args] = func(*args)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 603, in get_operation
>>>>     in descriptor.transforms[transform_id].outputs.items()
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 602, in <dictcomp>
>>>>     for tag, pcoll_id
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 601, in <listcomp>
>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 544, in wrapper
>>>>     result = cache[args] = func(*args)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 606, in get_operation
>>>>     transform_id, transform_consumers)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 865, in create_operation
>>>>     return creator(self, transform_id, transform_proto, payload,
>>>> consumers)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 1108, in create
>>>>     serialized_fn, parameter)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>> line 1146, in _create_pardo_operation
>>>>     dofn_data = pickler.loads(serialized_fn)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>>> line 265, in loads
>>>>     return dill.loads(s)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 317, in loads
>>>>     return load(file, ignore)
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 305, in load
>>>>     obj = pik.load()
>>>>   File
>>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>>> line 474, in find_class
>>>>     return StockUnpickler.find_class(self, module, name)
>>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>>
>>>>

Re: Pipeline AttributeError on Python3

Posted by Valentyn Tymofieiev <va...@google.com>.
 I think we have heard of this issue from the same source:

This looks exactly like a race condition that we've encountered on Python
> 3.7.1: There's a bug in some older 3.7.x releases that breaks the
> thread-safety of the unpickler, as concurrent unpickle threads can access a
> module before it has been fully imported. See
> https://bugs.python.org/issue34572 for more information.
>
> The traceback shows a Python 3.6 venv so this could be a different issue
> (the unpickle bug was introduced in version 3.7). If it's the same bug then
> upgrading to Python 3.7.3 or higher should fix that issue. One potential
> workaround is to ensure that all of the modules get imported during the
> initialization of the sdk_worker, as this bug only affects imports done by
> the unpickler.


The symptoms do sound similar, so I would try to reproduce your issue on
3.7.3 and see if it is gone, or try to reproduce
https://bugs.python.org/issue34572 in the version of interpreter you use.
If this doesn't help, you can try to reproduce the race using your input.

To get the output of serialized do fn, you could do the following:
1. Patch https://github.com/apache/beam/pull/10036.
2. Set logging level to DEBUG, see:
https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
.
3. Check for log output for payload of your transform, it may look like:

    transforms {
      key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
      value {
        spec {
          urn: "beam:transform:pardo:v1"
          payload: "\n\347\006\n\275\006\n
beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
....

Then you can extract the output of pickled fn:

from apache_beam.utils import proto_utils
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.internal import pickler

payload = b'\n\347\006\n\275\006\n
beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
pardo_payload = proto_utils.parse_Bytes(x, beam_runner_api_pb2.ParDoPayload)
pickled_fn = pardo_payload.do_fn.spec.payload

pickler.loads(pickle_fn) # Presumably the race happens here when unpickling
one of your transforms (pricingrealtime.aggregation.aggregation_transform).


On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar <ra...@lyft.com> wrote:

> Thanks Valentyn,
>
> Aggregation_transform.py doesn't have any transformation method which
> extends beam.DoFn. We are using plain python method which we passed in
> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
> please let me the process?
>
> I also heard that some people ran into this issue on Python 3.7.1 but the
> same issue is not present on Python 3.7.3. Can you confirm this?
>
>
>
> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <va...@google.com>
> wrote:
>
>> +user@, bcc: dev@
>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>> this issue, although we saw instances of this bug in exactly opposite
>> scenarios - when pipeline was defined *in one file*, but not in multiple
>> files.
>>
>> Could you try replacing instances of super() in aggregation_transform.py
>> as done in https://github.com/apache/beam/pull/9513 and see if this
>> issue is still reproducible?
>>
>> If that doesn't work, I would try to get the dump of serialized_fn, and
>> try to reproduce the issue in isolated environment, such as:
>>
>> form apache_beam.internal import pickler
>> serialized_fn = "..content.."
>> pickler.loads(serialized_fn)
>>
>> then I would try to trim the doFn in the example to a
>> minimally-reproducible example. It could be another issue with dill
>> dependency.
>>
>>
>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> We have noticed a weird intermittent issue on Python3 but we don't run
>>> into this issue on python2. Sometimes when we are trying to submit the
>>> pipeline, we get AttributeError (Check the stack trace below).  we have
>>> double-checked and we do find the attribute/methods are present in the
>>> right module and in right place but somehow the pipeline still complains
>>> about it. In some cases, we refer methods before their definition. We tried
>>> to reorder the method definition but that didn't help at all.
>>>
>>> We don't see the same issue when the entire pipeline is defined in one
>>> file. Also, note that this doesn't happen all the time when we submit the
>>> pipeline, so I feel it is some kind of race condition. When we enable the
>>> worker recycle logic it happens most of the time when sdk worker is
>>> recycled.
>>>
>>> Some more information about the environment:
>>> Python version: 3
>>> Beam version: 2.16
>>> Flink version: 1.8
>>>
>>> *Stack trace: *
>>>
>>>    - :
>>>
>>> TimerException{java.lang.RuntimeException: Failed to finish remote
>>> bundle}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>>> at
>>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>>> at
>>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>>> at
>>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>>> ... 7 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.RuntimeException: Error received from SDK harness for instruction
>>> 6: Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 307, in get
>>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>>> IndexError: pop from empty list
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>> line 261, in loads
>>>     return dill.loads(s)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 317, in loads
>>>     return load(file, ignore)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 305, in load
>>>     obj = pik.load()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 474, in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on
>>> <module 'pricingrealtime.aggregation.aggregation_transform' from
>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 165, in _execute
>>>     response = task()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 198, in <lambda>
>>>     self._execute(lambda: worker.do_instruction(work), work)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 351, in do_instruction
>>>     request.instruction_id)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 371, in process_bundle
>>>     instruction_id, request.process_bundle_descriptor_reference)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 313, in get
>>>     self.data_channel_factory)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 576, in __init__
>>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 620, in create_execution_tree
>>>     descriptor.transforms, key=topological_height, reverse=True)])
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 619, in <listcomp>
>>>     for transform_id in sorted(
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 603, in get_operation
>>>     in descriptor.transforms[transform_id].outputs.items()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 602, in <dictcomp>
>>>     for tag, pcoll_id
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 601, in <listcomp>
>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 603, in get_operation
>>>     in descriptor.transforms[transform_id].outputs.items()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 602, in <dictcomp>
>>>     for tag, pcoll_id
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 601, in <listcomp>
>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 544, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 606, in get_operation
>>>     transform_id, transform_consumers)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 865, in create_operation
>>>     return creator(self, transform_id, transform_proto, payload,
>>> consumers)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1108, in create
>>>     serialized_fn, parameter)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1146, in _create_pardo_operation
>>>     dofn_data = pickler.loads(serialized_fn)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>>> line 265, in loads
>>>     return dill.loads(s)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 317, in loads
>>>     return load(file, ignore)
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 305, in load
>>>     obj = pik.load()
>>>   File
>>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>>> line 474, in find_class
>>>     return StockUnpickler.find_class(self, module, name)
>>> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>>> 'pricingrealtime.aggregation.aggregation_transform' from
>>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>>
>>>

Re: Pipeline AttributeError on Python3

Posted by Rakesh Kumar <ra...@lyft.com>.
Thanks Valentyn,

Aggregation_transform.py doesn't have any transformation method which
extends beam.DoFn. We are using plain python method which we passed in
beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
please let me the process?

I also heard that some people ran into this issue on Python 3.7.1 but the
same issue is not present on Python 3.7.3. Can you confirm this?



On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev <va...@google.com>
wrote:

> +user@, bcc: dev@
> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
> this issue, although we saw instances of this bug in exactly opposite
> scenarios - when pipeline was defined *in one file*, but not in multiple
> files.
>
> Could you try replacing instances of super() in aggregation_transform.py
> as done in https://github.com/apache/beam/pull/9513 and see if this issue
> is still reproducible?
>
> If that doesn't work, I would try to get the dump of serialized_fn, and
> try to reproduce the issue in isolated environment, such as:
>
> form apache_beam.internal import pickler
> serialized_fn = "..content.."
> pickler.loads(serialized_fn)
>
> then I would try to trim the doFn in the example to a
> minimally-reproducible example. It could be another issue with dill
> dependency.
>
>
> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar <ra...@lyft.com> wrote:
>
>> Hi All,
>>
>> We have noticed a weird intermittent issue on Python3 but we don't run
>> into this issue on python2. Sometimes when we are trying to submit the
>> pipeline, we get AttributeError (Check the stack trace below).  we have
>> double-checked and we do find the attribute/methods are present in the
>> right module and in right place but somehow the pipeline still complains
>> about it. In some cases, we refer methods before their definition. We tried
>> to reorder the method definition but that didn't help at all.
>>
>> We don't see the same issue when the entire pipeline is defined in one
>> file. Also, note that this doesn't happen all the time when we submit the
>> pipeline, so I feel it is some kind of race condition. When we enable the
>> worker recycle logic it happens most of the time when sdk worker is
>> recycled.
>>
>> Some more information about the environment:
>> Python version: 3
>> Beam version: 2.16
>> Flink version: 1.8
>>
>> *Stack trace: *
>>
>>    - :
>>
>> TimerException{java.lang.RuntimeException: Failed to finish remote bundle}
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:335)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Failed to finish remote bundle
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.finishBundle(ExecutableStageDoFnOperator.java:667)
>> at
>> org.apache.beam.runners.core.StatefulDoFnRunner.finishBundle(StatefulDoFnRunner.java:144)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$2.finishBundle(ExecutableStageDoFnOperator.java:754)
>> at
>> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.finishBundle(DoFnRunnerWithMetricsUpdate.java:86)
>> at
>> org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.finishBundle(SimplePushbackSideInputDoFnRunner.java:118)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.invokeFinishBundle(DoFnOperator.java:750)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.checkInvokeFinishBundleByTime(DoFnOperator.java:744)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.lambda$open$1(DoFnOperator.java:460)
>> at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
>> ... 7 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: Error received from SDK harness for instruction
>> 6: Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 307, in get
>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>> IndexError: pop from empty list
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>> line 261, in loads
>>     return dill.loads(s)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 317, in loads
>>     return load(file, ignore)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 305, in load
>>     obj = pik.load()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 474, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>> *AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>> 'pricingrealtime.aggregation.aggregation_transform' from
>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>*
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 165, in _execute
>>     response = task()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 198, in <lambda>
>>     self._execute(lambda: worker.do_instruction(work), work)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 351, in do_instruction
>>     request.instruction_id)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 371, in process_bundle
>>     instruction_id, request.process_bundle_descriptor_reference)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 313, in get
>>     self.data_channel_factory)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 576, in __init__
>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 620, in create_execution_tree
>>     descriptor.transforms, key=topological_height, reverse=True)])
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 619, in <listcomp>
>>     for transform_id in sorted(
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 603, in get_operation
>>     in descriptor.transforms[transform_id].outputs.items()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 602, in <dictcomp>
>>     for tag, pcoll_id
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 601, in <listcomp>
>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 603, in get_operation
>>     in descriptor.transforms[transform_id].outputs.items()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 602, in <dictcomp>
>>     for tag, pcoll_id
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 601, in <listcomp>
>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 544, in wrapper
>>     result = cache[args] = func(*args)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 606, in get_operation
>>     transform_id, transform_consumers)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 865, in create_operation
>>     return creator(self, transform_id, transform_proto, payload,
>> consumers)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1108, in create
>>     serialized_fn, parameter)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1146, in _create_pardo_operation
>>     dofn_data = pickler.loads(serialized_fn)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/apache_beam/internal/pickler.py",
>> line 265, in loads
>>     return dill.loads(s)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 317, in loads
>>     return load(file, ignore)
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 305, in load
>>     obj = pik.load()
>>   File
>> "/srv/venvs/service/trusty/service_venv_python3.6/lib/python3.6/site-packages/dill/_dill.py",
>> line 474, in find_class
>>     return StockUnpickler.find_class(self, module, name)
>> AttributeError: Can't get attribute '_timestamp_keyed_result' on <module
>> 'pricingrealtime.aggregation.aggregation_transform' from
>> '/srv/testpricingrt/7e5425fc4400bf14e80a98fb13461530c7867891/pricingrealtime/aggregation/aggregation_transform.py'>
>>
>>