You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "tvalentyn (via GitHub)" <gi...@apache.org> on 2023/04/10 21:30:30 UTC
[GitHub] [beam] tvalentyn opened a new issue, #26209: [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners
tvalentyn opened a new issue, #26209:
URL: https://github.com/apache/beam/issues/26209
### What happened?
Combiner combiner lifting and combiner `with_fanout` utility copy portions of Beam's subgraph related to combiners. It appears that unpickling cloudpickle-pickled bytes encoding those subgraph results in multiple CombineFns sharing the same state, which results in side-effect in combiner setup and teardown initialization.
TODO: add details, failure examples and code pointers.
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [I] [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners [beam]
Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1881661699
```
INFO:root:<CombineOperation Do/CombinePerKey/CombinePerKey(PreCombineFn)/ExtractOutputs phase=extract>
INFO:root:<CombineOperation Do/CombinePerKey/CombinePerKey(PreCombineFn)/Merge phase=merge>
```
The setup calls for these two at [bundle_processor](https://github.com/apache/beam/blob/75cfbee1591b99ff02d0a6a19631199e719b44fa/sdks/python/apache_beam/runners/worker/bundle_processor.py#L974) share the same `CombineFn` objects resulting in sharing state.
I will look into [translations](https://github.com/apache/beam/blob/75cfbee1591b99ff02d0a6a19631199e719b44fa/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1378) and see how these are getting pickled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [I] [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners [beam]
Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1882208113
From my experiments what i found out is that
if you unpickle the same payload into two different objects,
With Dill, both objects share different memory address
With Cloudpickle, they share same memory address.
in the `CombineFn` `with_fanout`, we have `lift_combiners` during pipeline stage_creations, here I was inspection pipeline proto and found out that Combine stages share same proto
For example, for Merge and ExtractOutput, this is the proto def
```
INFO:root:unique_name: "Do/CombinePerKey/CombinePerKey(PreCombineFn)/Merge"
spec {
urn: "beam:transform:combine_per_key_merge_accumulators:v1"
payload: "\n\260\013\n beam:combinefn:pickled_python:v1\032\213\013\n\210\013QlpoOTFBWSZTWSejpnAAAvl/9P//1blC8Zv3n3/f5r////Zj5x4QFAAAAQQAUAO+uzXW27ruM7WdYSSQp4JCeg1NMR4hP1AjAJtTIwBBgjCDATEwGiGmGplGI0TChkaaAANPUNGgAAAAAAAAAABKaJE01TZo1NCbUeoPUPUAaeoNAAAAAAAAAeoHlDgABoAGgDQAAGjEAA0AAAAADQASmqaAFGaap+inqeUyaB6gGmJ6Ro2iDTRo0PUZAPU9IGmQACJNfGbYf9uAQYQQGEMLHYYh+sgH7Hmi/00ptIf8Auac02UitRt1Y5CWzcaWjZfGQ2NWnED4py8y6HLCYKLNWgmENmTDOIZDSk40REEQiHcfLIy4xkZwy1GEzH819JmdYQR4vgGJYwxYj03YDcEHuERBK1SGGOpoBHaC6F5GXXOR08euPzsLNQZC5WgHFK98logwMMPuc4ffc6oJpEQ/D933+//szND08iaOCGGOOLfCNgTMj1z61A+DBTQ5oMzK5exvY7nDyiThEqZ7J21YCdY6B6dZPgiJBGYRJIuPV1p1ttE7PLXceZcrgfcMoqdKZFBTEhviKyhoG31vUcHOtE4FLZQiRjb7vkUBCfPP6rgqvNIw6xosiMYwVUQXLrdUIDJRYyOFO7PrdVsQ6ae+9NAqx15YOagHQjOErBQaeZaEwSwcYdWggYmSlgVzT5YglzAygT6tgIkCLuGzVcIyeW3YefuQ4ERQiZ7cIigzX4Te1/mw1dL2ksFzS1Z9Yxfsas01JNesykI08fMTJDeZIHFwD2qyD1stverBUM3NzqWf7kX8qOFk2k7gYRK05qnq6Uhoy7PB9tQPf8rTu+CIi/clKTu+ExFGPDiTl+3osE2fqTjJU7GPCg1JImEAC+DA2OuARIDjDhbE1pJwFJXo
JphrbAnYEQw6TpXEEVTEzDjBowlMhNE8SDHyQbKHMIII5HXZkFbgWcvRHqBwWBnElA6dFp5hwWUTa1WQViqkavqtkpmigH9UvzpJ41CC1ANMTC9VIPQD5ZgasomaEkJigzmU7Ic/BNLdWjn/cu/Gth6BaNnORhTKaeT1VvO8GGWexT0YNIlfo0LxIakyabaBUXLIiEnWzV8TFBiX7GIyGE6RulEtrTMTOZ6q/g0kTyVF1ZdqL3Ha4TvGYWC7AW5pZQQmjDk07u7teB8vrPBNQxdZNb0rVehEECPoweDHZVCiz0+wclFbSvGZvuShaZgsGfJOSvTRdduwrCJ61gxPIpTZFSi22tBDjGTMfw/yb+9OgJmny1PPv6Gaop22ZWYSdO7MQKIEhQOcWw6g7Vb6cFQBFRwycWH0EgPPBaEBMowZwfwhJEVElLOK3wVdnKQt3KR6KZxCTqlPD8Z3CxNVcqIkCStLBaKkWgOX5PEXckU4UJAno6Zw\022\037ref_Coder_FastPrimitivesCoder_7"
}
inputs {
key: "in"
value: "pcollection_1"
}
outputs {
key: "out"
value: "pcollection_2"
}
environment_id: "ref_Environment_default_environment_1"
annotations {
key: "python_type"
value: "apache_beam.transforms.core.CombinePerKey"
}
INFO:root:unique_name: "Do/CombinePerKey/CombinePerKey(PreCombineFn)/ExtractOutputs"
spec {
urn: "beam:transform:combine_per_key_extract_outputs:v1"
payload: "\n\260\013\n beam:combinefn:pickled_python:v1\032\213\013\n\210\013QlpoOTFBWSZTWSejpnAAAvl/9P//1blC8Zv3n3/f5r////Zj5x4QFAAAAQQAUAO+uzXW27ruM7WdYSSQp4JCeg1NMR4hP1AjAJtTIwBBgjCDATEwGiGmGplGI0TChkaaAANPUNGgAAAAAAAAAABKaJE01TZo1NCbUeoPUPUAaeoNAAAAAAAAAeoHlDgABoAGgDQAAGjEAA0AAAAADQASmqaAFGaap+inqeUyaB6gGmJ6Ro2iDTRo0PUZAPU9IGmQACJNfGbYf9uAQYQQGEMLHYYh+sgH7Hmi/00ptIf8Auac02UitRt1Y5CWzcaWjZfGQ2NWnED4py8y6HLCYKLNWgmENmTDOIZDSk40REEQiHcfLIy4xkZwy1GEzH819JmdYQR4vgGJYwxYj03YDcEHuERBK1SGGOpoBHaC6F5GXXOR08euPzsLNQZC5WgHFK98logwMMPuc4ffc6oJpEQ/D933+//szND08iaOCGGOOLfCNgTMj1z61A+DBTQ5oMzK5exvY7nDyiThEqZ7J21YCdY6B6dZPgiJBGYRJIuPV1p1ttE7PLXceZcrgfcMoqdKZFBTEhviKyhoG31vUcHOtE4FLZQiRjb7vkUBCfPP6rgqvNIw6xosiMYwVUQXLrdUIDJRYyOFO7PrdVsQ6ae+9NAqx15YOagHQjOErBQaeZaEwSwcYdWggYmSlgVzT5YglzAygT6tgIkCLuGzVcIyeW3YefuQ4ERQiZ7cIigzX4Te1/mw1dL2ksFzS1Z9Yxfsas01JNesykI08fMTJDeZIHFwD2qyD1stverBUM3NzqWf7kX8qOFk2k7gYRK05qnq6Uhoy7PB9tQPf8rTu+CIi/clKTu+ExFGPDiTl+3osE2fqTjJU7GPCg1JImEAC+DA2OuARIDjDhbE1pJwFJXo
JphrbAnYEQw6TpXEEVTEzDjBowlMhNE8SDHyQbKHMIII5HXZkFbgWcvRHqBwWBnElA6dFp5hwWUTa1WQViqkavqtkpmigH9UvzpJ41CC1ANMTC9VIPQD5ZgasomaEkJigzmU7Ic/BNLdWjn/cu/Gth6BaNnORhTKaeT1VvO8GGWexT0YNIlfo0LxIakyabaBUXLIiEnWzV8TFBiX7GIyGE6RulEtrTMTOZ6q/g0kTyVF1ZdqL3Ha4TvGYWC7AW5pZQQmjDk07u7teB8vrPBNQxdZNb0rVehEECPoweDHZVCiz0+wclFbSvGZvuShaZgsGfJOSvTRdduwrCJ61gxPIpTZFSi22tBDjGTMfw/yb+9OgJmny1PPv6Gaop22ZWYSdO7MQKIEhQOcWw6g7Vb6cFQBFRwycWH0EgPPBaEBMowZwfwhJEVElLOK3wVdnKQt3KR6KZxCTqlPD8Z3CxNVcqIkCStLBaKkWgOX5PEXckU4UJAno6Zw\022\037ref_Coder_FastPrimitivesCoder_7"
}
```
IIUC using cloudpickle, a same instance is shared across the code causing this error. I will double check one more time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] tvalentyn commented on issue #26209: [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners
Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1507275148
This test runs on direct runner (bundle based and Portable/FnAPI direct runners) , but the failure can also be reproduced with a TestDataflowRunner from a sister ValidatesRunner suite.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] tvalentyn commented on issue #26209: [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners
Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1507271346
Test failure looks like the following:
```__________________ LocalCombineFnLifecycleTest_0.test_combine __________________
...
self = <apache_beam.transforms.combinefn_lifecycle_test.LocalCombineFnLifecycleTest_0 testMethod=test_combine>
def test_combine(self):
> run_combine(TestPipeline(runner=self.runner()))
apache_beam/transforms/combinefn_lifecycle_test.py:88:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/transforms/combinefn_lifecycle_pipeline.py:108: in run_combine
with pipeline as p:
apache_beam/pipeline.py:600: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:116: in run
state = result.wait_until_finish()
apache_beam/runners/direct/direct_runner.py:588: in wait_until_finish
self._executor.await_completion()
apache_beam/runners/direct/executor.py:432: in await_completion
self._executor.await_completion()
apache_beam/runners/direct/executor.py:480: in await_completion
raise update.exception
apache_beam/runners/direct/executor.py:370: in call
self.attempt_call(
apache_beam/runners/direct/executor.py:404: in attempt_call
evaluator.start_bundle()
apache_beam/runners/direct/transform_evaluator.py:869: in start_bundle
self.runner.setup()
apache_beam/runners/common.py:[147](https://github.com/apache/beam/actions/runs/4613151287/jobs/8154848595?pr=26088#step:6:148)2: in setup
self._invoke_lifecycle_method(self.do_fn_invoker.invoke_setup)
apache_beam/runners/common.py:1468: in _invoke_lifecycle_method
self._reraise_augmented(exn)
apache_beam/runners/common.py:[150](https://github.com/apache/beam/actions/runs/4613151287/jobs/8154848595?pr=26088#step:6:151)8: in _reraise_augmented
raise new_exn.with_traceback(tb)
apache_beam/runners/common.py:1466: in _invoke_lifecycle_method
lifecycle_method()
apache_beam/runners/common.py:552: in invoke_setup
self.signature.setup_lifecycle_method.method_value()
apache_beam/runners/direct/helper_transforms.py:101: in setup
self._combine_fn.setup()
apache_beam/typehints/typecheck.py:205: in setup
self._combinefn.setup(*args, **kwargs)
apache_beam/transforms/combiners.py:843: in setup
self.fn.setup(*self.args, **self.kwargs)
apache_beam/transforms/combiners.py:665: in setup
c.setup(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.transforms.combinefn_lifecycle_pipeline.CallSequenceEnforcingCombineFn object at 0x7f3918796750>
args = (None,), kwargs = {}
def setup(self, *args, **kwargs):
> assert not self._setup_called, 'setup should not be called twice'
E AssertionError: setup should not be called twice [while running 'Do/CombinePerKey/CombinePerKey(PreCombineFn)/ParDo(FinishCombine)']
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] tvalentyn commented on issue #26209: [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners
Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1507291617
The failure is not happening if I manually modify the graph rewriting portions responsible for combiner lifting and disable combiner lifting, see: https://github.com/apache/beam/blob/7919c3fdf07c7802f62061e5e222027df5e02ce5/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L373
@AnandInguva also mentioned the error is not reproducible if with_fanout is disabled. with_fanout involves copying here: https://github.com/apache/beam/blob/837733e862007cbeba0a57f682109d9debec9340/sdks/python/apache_beam/transforms/core.py#L2467
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [I] [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners [beam]
Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1874478963
@tvalentyn I can pick up the investigation from here if you are not working on it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org