You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "amontoli (via GitHub)" <gi...@apache.org> on 2023/03/03 11:39:32 UTC

[GitHub] [beam] amontoli opened a new issue, #25709: [Bug]: Data not being forwarded correctly to Stateful DoFn in Python SDK + PortableRunner

amontoli opened a new issue, #25709:
URL: https://github.com/apache/beam/issues/25709

   ### What happened?
   
   I tried to run a pipeline on a Flink cluster, using the PortableRunner. The pipeline contains a dummy DoFn, which simply prints the element it has to process, and a dummy Stateful DoFn, similar to the non-stateful one, but with a ReadModifyWriteStateSpec variable defined. I am using Beam 2.45.0. Here is the sample code:
   ```
   import logging
   from typing import Tuple
   import apache_beam as beam
   from apache_beam.options.pipeline_options import PipelineOptions
   
   logging.basicConfig(level=logging.INFO)
   
   class DummyDoFn(beam.DoFn):
       def process(self, element):
           logging.info(f"Input to DummyDoFn: " + str(element))
           yield element
   
   class DummyReadModifyWriteStateDoFn(beam.DoFn):
       VALUES = beam.transforms.userstate.ReadModifyWriteStateSpec(name="values",
                                                                   coder=beam.coders.PickleCoder())
   
       def process(self, element: Tuple[int, int], values = beam.DoFn.StateParam(VALUES)):
           logging.info(f"Input to DummyReadModifyWriteStateDoFn: " + str(element))
           yield element
   
   def run():
       common_options = ['--job_name=Test stateful DoFn', '--streaming']
   
       flink = True
       if flink:
           runner_options =  [
                   '--runner=PortableRunner',
                   '--job_endpoint=localhost:8099',
                   '--artifact_endpoint=localhost:8098',
                   '--environment_type=EXTERNAL',
                   '--environment_config=localhost:50000',
                   ]
       else:
           runner_options = ["--runner=DirectRunner"]
       options = common_options + runner_options
       beam_options = PipelineOptions(flags = options)
   
       with beam.Pipeline(options = beam_options) as pipeline:
           (pipeline | beam.Create([i for i in range(10)])
                     | beam.Map(lambda x: (1,x))
                     | "DummyDoFn1" >> beam.ParDo(DummyDoFn())
                     | "DummyDoFn2" >> beam.ParDo(DummyReadModifyWriteStateDoFn())
                     | beam.Map(lambda x: logging.info("Final output" + str(x))))
   
   if __name__ == "__main__":
       run()
   ```
   
   If I run this pipeline on the Flink cluster (`flink = True`), the output is:
   ```
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                               [] - Input to DummyDoFn: (1, 0)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                               [] - Input to DummyDoFn: (1, 1)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                               [] - Input to DummyDoFn: (1, 2)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                               [] - Input to DummyDoFn: (1, 3)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                               [] - Input to DummyDoFn: (1, 4)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                               [] - Input to DummyDoFn: (1, 5)
   2023-03-03 11:12:12,314 INFO  pipeline.py:18                                               [] - Input to DummyDoFn: (1, 6)
   2023-03-03 11:12:12,315 INFO  pipeline.py:18                                               [] - Input to DummyDoFn: (1, 7)
   2023-03-03 11:12:12,315 INFO  pipeline.py:18                                               [] - Input to DummyDoFn: (1, 8)
   2023-03-03 11:12:12,315 INFO  pipeline.py:18                                               [] - Input to DummyDoFn: (1, 9)
   2023-03-03 11:12:12,320 INFO  org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - getProcessBundleDescriptor request with id 1-2
   2023-03-03 11:12:12,321 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - [3]{Create, Map(<lambda at pipeline.py:51>), DummyDoFn1} (1/1)#0 (7b7e232cdc48cf8b67549d57b22af77d) switched from RUNNING to FINISHED.
   2023-03-03 11:12:12,322 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for [3]{Create, Map(<lambda at pipeline.py:51>), DummyDoFn1} (1/1)#0 (7b7e232cdc48cf8b67549d57b22af77d).
   2023-03-03 11:12:12,323 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task [3]{Create, Map(<lambda at pipeline.py:51>), DummyDoFn1} (1/1)#0 7b7e232cdc48cf8b67549d57b22af77d.
   2023-03-03 11:12:12,338 INFO  pipeline.py:26                                               [] - Input to DummyReadModifyWriteStateDoFn: (1, 0)
   2023-03-03 11:12:12,338 INFO  pipeline.py:54                                               [] - Final output(1, 0)
   2023-03-03 11:12:12,338 INFO  pipeline.py:26                                               [] - Input to DummyReadModifyWriteStateDoFn: (1, 1)
   2023-03-03 11:12:12,338 INFO  pipeline.py:54                                               [] - Final output(1, 1)
   2023-03-03 11:12:12,338 INFO  pipeline.py:26                                               [] - Input to DummyReadModifyWriteStateDoFn: (1, 2)
   2023-03-03 11:12:12,338 INFO  pipeline.py:54                                               [] - Final output(1, 2)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                               [] - Input to DummyReadModifyWriteStateDoFn: (1, 3)
   2023-03-03 11:12:12,339 INFO  pipeline.py:54                                               [] - Final output(1, 3)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                               [] - Input to DummyReadModifyWriteStateDoFn: (1, 4)
   2023-03-03 11:12:12,339 INFO  pipeline.py:54                                               [] - Final output(1, 4)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                               [] - Input to DummyReadModifyWriteStateDoFn: (1, 5)
   2023-03-03 11:12:12,339 INFO  pipeline.py:54                                               [] - Final output(1, 5)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                               [] - Input to DummyReadModifyWriteStateDoFn: (1, 6)
   2023-03-03 11:12:12,339 INFO  pipeline.py:54                                               [] - Final output(1, 6)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                               [] - Input to DummyReadModifyWriteStateDoFn: (1, 7)
   2023-03-03 11:12:12,339 INFO  pipeline.py:54                                               [] - Final output(1, 7)
   2023-03-03 11:12:12,339 INFO  pipeline.py:26                                               [] - Input to DummyReadModifyWriteStateDoFn: (1, 8)
   2023-03-03 11:12:12,340 INFO  pipeline.py:54                                               [] - Final output(1, 8)
   2023-03-03 11:12:12,340 INFO  pipeline.py:26                                               [] - Input to DummyReadModifyWriteStateDoFn: (1, 9)
   2023-03-03 11:12:12,340 INFO  pipeline.py:54                                               [] - Final output(1, 9)
   ```
   
   On the other hand, if I run it on DirectRunner (`flink = False`), I get:
   ```
   INFO:root:Input to DummyDoFn: (1, 0)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 0)
   INFO:root:Final output(1, 0)
   INFO:root:Input to DummyDoFn: (1, 1)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 1)
   INFO:root:Final output(1, 1)
   INFO:root:Input to DummyDoFn: (1, 2)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 2)
   INFO:root:Final output(1, 2)
   INFO:root:Input to DummyDoFn: (1, 3)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 3)
   INFO:root:Final output(1, 3)
   INFO:root:Input to DummyDoFn: (1, 4)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 4)
   INFO:root:Final output(1, 4)
   INFO:root:Input to DummyDoFn: (1, 5)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 5)
   INFO:root:Final output(1, 5)
   INFO:root:Input to DummyDoFn: (1, 6)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 6)
   INFO:root:Final output(1, 6)
   INFO:root:Input to DummyDoFn: (1, 7)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 7)
   INFO:root:Final output(1, 7)
   INFO:root:Input to DummyDoFn: (1, 8)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 8)
   INFO:root:Final output(1, 8)
   INFO:root:Input to DummyDoFn: (1, 9)
   INFO:root:Input to DummyReadModifyWriteStateDoFn: (1, 9)
   INFO:root:Final output(1, 9)
   ```
   The output of the two runners is different: in the first case all data is processed through the first step, and then forwarded to the second step, while in the second case each datapoint is processed through both steps before moving to the next one. I think the expected output should be the second one. This issue can be blocking in streaming cases, where data is unbounded and for this reason the first step "never ends" and data is not processed by the second step.
   
   I also noticed that this issue affects both ReadModifyWriteStateSpec and BagStateSpec, but not CombiningValueStateSpec. I also noticed that the issue disappears when I remove the `values` variable in the `process` function of the Stateful DoFn class.
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on issue #25709: [Bug]: Data not being forwarded correctly to Stateful DoFn in Python SDK + PortableRunner

Posted by "jrmccluskey (via GitHub)" <gi...@apache.org>.
jrmccluskey commented on issue #25709:
URL: https://github.com/apache/beam/issues/25709#issuecomment-1458865446

   FWIW I have slightly modified this pipeline to execute on dataflow to check behavior there and got behavior like Flink's:
   
   ```
   INFO 2023-03-07T20:44:08.027822017Z Input to DummyDoFn: (1, 4)
   INFO 2023-03-07T20:44:08.042022228Z Input to DummyDoFn: (1, 5)
   INFO 2023-03-07T20:44:08.042737245Z Input to DummyDoFn: (1, 3)
   INFO 2023-03-07T20:44:08.051712751Z Input to DummyDoFn: (1, 8)
   INFO 2023-03-07T20:44:08.056941747Z Input to DummyDoFn: (1, 7)
   INFO 2023-03-07T20:44:08.057651281Z Input to DummyDoFn: (1, 1)
   INFO 2023-03-07T20:44:08.072991371Z Input to DummyDoFn: (1, 0)
   INFO 2023-03-07T20:44:08.084101915Z Input to DummyDoFn: (1, 2)
   INFO 2023-03-07T20:44:08.086365222Z Input to DummyDoFn: (1, 9)
   INFO 2023-03-07T20:44:08.088879585Z Input to DummyDoFn: (1, 6)
   INFO 2023-03-07T20:44:08.299842119Z Input to DummyReadModifyWriteStateDoFn: (1, 4)
   INFO 2023-03-07T20:44:08.300017595Z Final output(1, 4)
   INFO 2023-03-07T20:44:08.355724573Z Input to DummyReadModifyWriteStateDoFn: (1, 3)
   INFO 2023-03-07T20:44:08.355904102Z Final output(1, 3)
   INFO 2023-03-07T20:44:08.356028318Z Input to DummyReadModifyWriteStateDoFn: (1, 6)
   INFO 2023-03-07T20:44:08.356083869Z Final output(1, 6)
   INFO 2023-03-07T20:44:08.356168270Z Input to DummyReadModifyWriteStateDoFn: (1, 8)
   INFO 2023-03-07T20:44:08.356222152Z Final output(1, 8)
   INFO 2023-03-07T20:44:15.685304403Z Input to DummyReadModifyWriteStateDoFn: (1, 7)
   INFO 2023-03-07T20:44:15.685476541Z Final output(1, 7)
   INFO 2023-03-07T20:44:15.733944654Z Input to DummyReadModifyWriteStateDoFn: (1, 5)
   INFO 2023-03-07T20:44:15.734126329Z Final output(1, 5)
   INFO 2023-03-07T20:44:15.734240531Z Input to DummyReadModifyWriteStateDoFn: (1, 2)
   INFO 2023-03-07T20:44:15.734311342Z Final output(1, 2)
   INFO 2023-03-07T20:44:15.750033617Z Input to DummyReadModifyWriteStateDoFn: (1, 0)
   INFO 2023-03-07T20:44:15.750293016Z Final output(1, 0)
   INFO 2023-03-07T20:44:15.750429391Z Input to DummyReadModifyWriteStateDoFn: (1, 9)
   INFO 2023-03-07T20:44:15.750485658Z Final output(1, 9)
   INFO 2023-03-07T20:44:15.750657320Z Input to DummyReadModifyWriteStateDoFn: (1, 1)
   INFO 2023-03-07T20:44:15.750710273Z Final output(1, 1)
   ```
   
   My guess is that the `Create` function is outputting the elements as a single bundle on Flink and Dataflow. This also shouldn't be blocking in a streaming context since you're doing aggregations based on windows and/or triggers (or jsut processing input as you get it,) the unbounded nature of the input is accounted for. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] amontoli commented on issue #25709: [Bug]: Data not being forwarded correctly to Stateful DoFn in Python SDK + PortableRunner

Posted by "amontoli (via GitHub)" <gi...@apache.org>.
amontoli commented on issue #25709:
URL: https://github.com/apache/beam/issues/25709#issuecomment-1458091676

   Thank you for your answer. Yes, the issue affects the PortableRunner, not the direct one.
   
   I can also conferm the same "ordering" issue affects Batch pipelines as well, even though it is probably less impacting (you still get to the end of the pipeline, while in the streaming case this issue is blocking).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on issue #25709: [Bug]: Data not being forwarded correctly to Stateful DoFn in Python SDK + PortableRunner

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on issue #25709:
URL: https://github.com/apache/beam/issues/25709#issuecomment-1457470619

   Thanks for reporting. I am seeing that this issue was filed for a streaming pipeline. Streaming direct runner has limited support, and we are tracking improvements under #24528  . 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on issue #25709: [Bug]: Data not being forwarded correctly to Stateful DoFn in Python SDK + PortableRunner

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on issue #25709:
URL: https://github.com/apache/beam/issues/25709#issuecomment-1457474625

   i may have misread this actually. Is direct runner working correctly, but portable runner + flink not working correctly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] amontoli commented on issue #25709: [Bug]: Data not being forwarded correctly to Stateful DoFn in Python SDK + PortableRunner

Posted by "amontoli (via GitHub)" <gi...@apache.org>.
amontoli commented on issue #25709:
URL: https://github.com/apache/beam/issues/25709#issuecomment-1459706270

   I wasn't able to make the Kafka X-lang connector work, so I have written down a simple (non-splittable) PTransform starting from the [confluent_kafka](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#) Python library. Replacing the Create step with this connector, data is processed through the first step, but it never gets to the Stateful DoFn (without any custom triggers or windows). This, of course, could also be related to the fact that I haven't used the built-in connector, but a custom one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on issue #25709: [Bug]: Data not being forwarded correctly to Stateful DoFn in Python SDK + PortableRunner

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on issue #25709:
URL: https://github.com/apache/beam/issues/25709#issuecomment-1457472339

   If you think this issue is reproducible in Batch pipelines as well, that would be good to know. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org