You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "tvalentyn (via GitHub)" <gi...@apache.org> on 2023/04/10 21:30:30 UTC

[GitHub] [beam] tvalentyn opened a new issue, #26209: [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners

tvalentyn opened a new issue, #26209:
URL: https://github.com/apache/beam/issues/26209

   ### What happened?
   
   Combiner combiner  lifting and combiner `with_fanout` utility  copy portions of Beam's subgraph related to combiners. It appears that unpickling cloudpickle-pickled bytes encoding those subgraph results in multiple CombineFns sharing the same state, which results in side-effect in combiner setup and teardown initialization. 
   
   TODO: add details, failure examples and code pointers.
   
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners [beam]

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1881661699

   ```
   INFO:root:<CombineOperation Do/CombinePerKey/CombinePerKey(PreCombineFn)/ExtractOutputs phase=extract>
   
   INFO:root:<CombineOperation Do/CombinePerKey/CombinePerKey(PreCombineFn)/Merge phase=merge>
   
   ```
   The setup calls for these two at [bundle_processor](https://github.com/apache/beam/blob/75cfbee1591b99ff02d0a6a19631199e719b44fa/sdks/python/apache_beam/runners/worker/bundle_processor.py#L974) share the same `CombineFn` objects resulting in sharing state. 
   
   
   I will look into [translations](https://github.com/apache/beam/blob/75cfbee1591b99ff02d0a6a19631199e719b44fa/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1378) and see how these are getting pickled.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners [beam]

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1882208113

   From my experiments what i found out is that
   
   if you unpickle the same payload into two different objects,
   
   With Dill, both objects share different memory address
   
   With Cloudpickle, they share same memory address.
   
   in the `CombineFn` `with_fanout`, we have `lift_combiners` during pipeline stage_creations, here I was inspection pipeline proto and found out that Combine stages share same proto
   
   For example, for Merge and ExtractOutput, this is the proto def
   ```
   INFO:root:unique_name: "Do/CombinePerKey/CombinePerKey(PreCombineFn)/Merge"
   spec {
     urn: "beam:transform:combine_per_key_merge_accumulators:v1"
     payload: "\n\260\013\n beam:combinefn:pickled_python:v1\032\213\013\n\210\013QlpoOTFBWSZTWSejpnAAAvl/9P//1blC8Zv3n3/f5r////Zj5x4QFAAAAQQAUAO+uzXW27ruM7WdYSSQp4JCeg1NMR4hP1AjAJtTIwBBgjCDATEwGiGmGplGI0TChkaaAANPUNGgAAAAAAAAAABKaJE01TZo1NCbUeoPUPUAaeoNAAAAAAAAAeoHlDgABoAGgDQAAGjEAA0AAAAADQASmqaAFGaap+inqeUyaB6gGmJ6Ro2iDTRo0PUZAPU9IGmQACJNfGbYf9uAQYQQGEMLHYYh+sgH7Hmi/00ptIf8Auac02UitRt1Y5CWzcaWjZfGQ2NWnED4py8y6HLCYKLNWgmENmTDOIZDSk40REEQiHcfLIy4xkZwy1GEzH819JmdYQR4vgGJYwxYj03YDcEHuERBK1SGGOpoBHaC6F5GXXOR08euPzsLNQZC5WgHFK98logwMMPuc4ffc6oJpEQ/D933+//szND08iaOCGGOOLfCNgTMj1z61A+DBTQ5oMzK5exvY7nDyiThEqZ7J21YCdY6B6dZPgiJBGYRJIuPV1p1ttE7PLXceZcrgfcMoqdKZFBTEhviKyhoG31vUcHOtE4FLZQiRjb7vkUBCfPP6rgqvNIw6xosiMYwVUQXLrdUIDJRYyOFO7PrdVsQ6ae+9NAqx15YOagHQjOErBQaeZaEwSwcYdWggYmSlgVzT5YglzAygT6tgIkCLuGzVcIyeW3YefuQ4ERQiZ7cIigzX4Te1/mw1dL2ksFzS1Z9Yxfsas01JNesykI08fMTJDeZIHFwD2qyD1stverBUM3NzqWf7kX8qOFk2k7gYRK05qnq6Uhoy7PB9tQPf8rTu+CIi/clKTu+ExFGPDiTl+3osE2fqTjJU7GPCg1JImEAC+DA2OuARIDjDhbE1pJwFJXo
 JphrbAnYEQw6TpXEEVTEzDjBowlMhNE8SDHyQbKHMIII5HXZkFbgWcvRHqBwWBnElA6dFp5hwWUTa1WQViqkavqtkpmigH9UvzpJ41CC1ANMTC9VIPQD5ZgasomaEkJigzmU7Ic/BNLdWjn/cu/Gth6BaNnORhTKaeT1VvO8GGWexT0YNIlfo0LxIakyabaBUXLIiEnWzV8TFBiX7GIyGE6RulEtrTMTOZ6q/g0kTyVF1ZdqL3Ha4TvGYWC7AW5pZQQmjDk07u7teB8vrPBNQxdZNb0rVehEECPoweDHZVCiz0+wclFbSvGZvuShaZgsGfJOSvTRdduwrCJ61gxPIpTZFSi22tBDjGTMfw/yb+9OgJmny1PPv6Gaop22ZWYSdO7MQKIEhQOcWw6g7Vb6cFQBFRwycWH0EgPPBaEBMowZwfwhJEVElLOK3wVdnKQt3KR6KZxCTqlPD8Z3CxNVcqIkCStLBaKkWgOX5PEXckU4UJAno6Zw\022\037ref_Coder_FastPrimitivesCoder_7"
   }
   inputs {
     key: "in"
     value: "pcollection_1"
   }
   outputs {
     key: "out"
     value: "pcollection_2"
   }
   environment_id: "ref_Environment_default_environment_1"
   annotations {
     key: "python_type"
     value: "apache_beam.transforms.core.CombinePerKey"
   }
   
   INFO:root:unique_name: "Do/CombinePerKey/CombinePerKey(PreCombineFn)/ExtractOutputs"
   spec {
     urn: "beam:transform:combine_per_key_extract_outputs:v1"
     payload: "\n\260\013\n beam:combinefn:pickled_python:v1\032\213\013\n\210\013QlpoOTFBWSZTWSejpnAAAvl/9P//1blC8Zv3n3/f5r////Zj5x4QFAAAAQQAUAO+uzXW27ruM7WdYSSQp4JCeg1NMR4hP1AjAJtTIwBBgjCDATEwGiGmGplGI0TChkaaAANPUNGgAAAAAAAAAABKaJE01TZo1NCbUeoPUPUAaeoNAAAAAAAAAeoHlDgABoAGgDQAAGjEAA0AAAAADQASmqaAFGaap+inqeUyaB6gGmJ6Ro2iDTRo0PUZAPU9IGmQACJNfGbYf9uAQYQQGEMLHYYh+sgH7Hmi/00ptIf8Auac02UitRt1Y5CWzcaWjZfGQ2NWnED4py8y6HLCYKLNWgmENmTDOIZDSk40REEQiHcfLIy4xkZwy1GEzH819JmdYQR4vgGJYwxYj03YDcEHuERBK1SGGOpoBHaC6F5GXXOR08euPzsLNQZC5WgHFK98logwMMPuc4ffc6oJpEQ/D933+//szND08iaOCGGOOLfCNgTMj1z61A+DBTQ5oMzK5exvY7nDyiThEqZ7J21YCdY6B6dZPgiJBGYRJIuPV1p1ttE7PLXceZcrgfcMoqdKZFBTEhviKyhoG31vUcHOtE4FLZQiRjb7vkUBCfPP6rgqvNIw6xosiMYwVUQXLrdUIDJRYyOFO7PrdVsQ6ae+9NAqx15YOagHQjOErBQaeZaEwSwcYdWggYmSlgVzT5YglzAygT6tgIkCLuGzVcIyeW3YefuQ4ERQiZ7cIigzX4Te1/mw1dL2ksFzS1Z9Yxfsas01JNesykI08fMTJDeZIHFwD2qyD1stverBUM3NzqWf7kX8qOFk2k7gYRK05qnq6Uhoy7PB9tQPf8rTu+CIi/clKTu+ExFGPDiTl+3osE2fqTjJU7GPCg1JImEAC+DA2OuARIDjDhbE1pJwFJXo
 JphrbAnYEQw6TpXEEVTEzDjBowlMhNE8SDHyQbKHMIII5HXZkFbgWcvRHqBwWBnElA6dFp5hwWUTa1WQViqkavqtkpmigH9UvzpJ41CC1ANMTC9VIPQD5ZgasomaEkJigzmU7Ic/BNLdWjn/cu/Gth6BaNnORhTKaeT1VvO8GGWexT0YNIlfo0LxIakyabaBUXLIiEnWzV8TFBiX7GIyGE6RulEtrTMTOZ6q/g0kTyVF1ZdqL3Ha4TvGYWC7AW5pZQQmjDk07u7teB8vrPBNQxdZNb0rVehEECPoweDHZVCiz0+wclFbSvGZvuShaZgsGfJOSvTRdduwrCJ61gxPIpTZFSi22tBDjGTMfw/yb+9OgJmny1PPv6Gaop22ZWYSdO7MQKIEhQOcWw6g7Vb6cFQBFRwycWH0EgPPBaEBMowZwfwhJEVElLOK3wVdnKQt3KR6KZxCTqlPD8Z3CxNVcqIkCStLBaKkWgOX5PEXckU4UJAno6Zw\022\037ref_Coder_FastPrimitivesCoder_7"
   }
   ```
   
   IIUC using cloudpickle, a same instance is shared across the code causing this error. I will double check one more time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on issue #26209: [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1507275148

   This test runs on direct runner (bundle based and Portable/FnAPI direct runners) , but the failure can also be reproduced  with a TestDataflowRunner from a sister ValidatesRunner suite.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on issue #26209: [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1507271346

   Test failure looks like the following:
   
   
   ```__________________ LocalCombineFnLifecycleTest_0.test_combine __________________
   ...
   self = <apache_beam.transforms.combinefn_lifecycle_test.LocalCombineFnLifecycleTest_0 testMethod=test_combine>
   
       def test_combine(self):
   >     run_combine(TestPipeline(runner=self.runner()))
   
   apache_beam/transforms/combinefn_lifecycle_test.py:88: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   apache_beam/transforms/combinefn_lifecycle_pipeline.py:108: in run_combine
       with pipeline as p:
   apache_beam/pipeline.py:600: in __exit__
       self.result = self.run()
   apache_beam/testing/test_pipeline.py:116: in run
       state = result.wait_until_finish()
   apache_beam/runners/direct/direct_runner.py:588: in wait_until_finish
       self._executor.await_completion()
   apache_beam/runners/direct/executor.py:432: in await_completion
       self._executor.await_completion()
   apache_beam/runners/direct/executor.py:480: in await_completion
       raise update.exception
   apache_beam/runners/direct/executor.py:370: in call
       self.attempt_call(
   apache_beam/runners/direct/executor.py:404: in attempt_call
       evaluator.start_bundle()
   apache_beam/runners/direct/transform_evaluator.py:869: in start_bundle
       self.runner.setup()
   apache_beam/runners/common.py:[147](https://github.com/apache/beam/actions/runs/4613151287/jobs/8154848595?pr=26088#step:6:148)2: in setup
       self._invoke_lifecycle_method(self.do_fn_invoker.invoke_setup)
   apache_beam/runners/common.py:1468: in _invoke_lifecycle_method
       self._reraise_augmented(exn)
   apache_beam/runners/common.py:[150](https://github.com/apache/beam/actions/runs/4613151287/jobs/8154848595?pr=26088#step:6:151)8: in _reraise_augmented
       raise new_exn.with_traceback(tb)
   apache_beam/runners/common.py:1466: in _invoke_lifecycle_method
       lifecycle_method()
   apache_beam/runners/common.py:552: in invoke_setup
       self.signature.setup_lifecycle_method.method_value()
   apache_beam/runners/direct/helper_transforms.py:101: in setup
       self._combine_fn.setup()
   apache_beam/typehints/typecheck.py:205: in setup
       self._combinefn.setup(*args, **kwargs)
   apache_beam/transforms/combiners.py:843: in setup
       self.fn.setup(*self.args, **self.kwargs)
   apache_beam/transforms/combiners.py:665: in setup
       c.setup(*args, **kwargs)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   self = <apache_beam.transforms.combinefn_lifecycle_pipeline.CallSequenceEnforcingCombineFn object at 0x7f3918796750>
   args = (None,), kwargs = {}
   
       def setup(self, *args, **kwargs):
   >     assert not self._setup_called, 'setup should not be called twice'
   E     AssertionError: setup should not be called twice [while running 'Do/CombinePerKey/CombinePerKey(PreCombineFn)/ParDo(FinishCombine)']
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on issue #26209: [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1507291617

   The failure is not happening if I manually modify the graph rewriting portions responsible for combiner lifting and disable combiner lifting, see: https://github.com/apache/beam/blob/7919c3fdf07c7802f62061e5e222027df5e02ce5/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L373 
   
   @AnandInguva also mentioned the error is not reproducible if with_fanout is disabled. with_fanout involves copying here: https://github.com/apache/beam/blob/837733e862007cbeba0a57f682109d9debec9340/sdks/python/apache_beam/transforms/core.py#L2467


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: cloudpickle appears to incorrectly unpickle cloned combiners [beam]

Posted by "AnandInguva (via GitHub)" <gi...@apache.org>.
AnandInguva commented on issue #26209:
URL: https://github.com/apache/beam/issues/26209#issuecomment-1874478963

   @tvalentyn I can pick up the investigation from here if you are not working on it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org