You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/05 00:26:46 UTC

[GitHub] [beam] damccorm opened a new issue, #21591: Python Reshuffle holds elements

damccorm opened a new issue, #21591:
URL: https://github.com/apache/beam/issues/21591

   Python Reshuffle holds elements when pipeline is running, and likely release them in a batch. In contrast, Java Reshuffle triggers on every element as noted in its documentation 
   "the trigger used with {@link Reshuffle} which triggers on every element and never buffers
    * state."
   
   Here is a working example:
   ```
   
   def test(p: Pipeline):
     class SlowProcessFn(beam.DoFn):
       def process(self, element):
         time.sleep(0.5)
   
        yield element
   
     result = (p 
       | beam.Create(range(100)) 
       | beam.ParDo(SlowProcessFn())
   
      | beam.Reshuffle() # HERE
       | beam.Map(lambda x: print(x, time.time())))
     return result
   
   ```
   
   Tested on local runner and flink runner (1.14), the elements are printed after 50 secs. If commenting out Reshuffle, every half second an element gets printed.
   
   This behavior introduces issue when downstream PTransform involves some kind of time-sensitive operation, like receiving a list of updated files from input and read them done by filebasedsource.ReadAllFiles transform. Because there is a Reshuffle in ReadAll, the actual read will be blocked.
   
   Imported from Jira [BEAM-14497](https://issues.apache.org/jira/browse/BEAM-14497). Original Jira may contain additional context.
   Reported by: yihu.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #21591: Python Reshuffle holds elements

Posted by GitBox <gi...@apache.org>.
Abacn commented on issue #21591:
URL: https://github.com/apache/beam/issues/21591#issuecomment-1157691531

   > I see that you also fixed #21606. That could be a source of this problem too.
   
   Thanks for follow up. With that fix in, tested with local flink cluster now it no longer holds elements, but the direct runner (without specifying --runner) Shuffle still hold elements. Likely related to that currently streaming is not fully supported running locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #21591: Python Reshuffle holds elements

Posted by GitBox <gi...@apache.org>.
Abacn commented on issue #21591:
URL: https://github.com/apache/beam/issues/21591#issuecomment-1152381549

   Sorry there was a typo, it should "the elements are not printed after 50 secs". i.e. It seems reShuffle always holds element until the stage is finished.Testes and it seems this behavior exists in all runners based on portable: python direct runner (which also calls fn_runner now), and flink.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #21591: Python Reshuffle holds elements

Posted by GitBox <gi...@apache.org>.
Abacn commented on issue #21591:
URL: https://github.com/apache/beam/issues/21591#issuecomment-1152551444

   .take-issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #21591: Python Reshuffle holds elements

Posted by GitBox <gi...@apache.org>.
Abacn commented on issue #21591:
URL: https://github.com/apache/beam/issues/21591#issuecomment-1152551114

   > Just to clarify: because you say "until the stage is finished" I think of batch processing, since stages do not finish in streaming. If it is batch, then this is expected behavior. In streaming, it is expected that a reshuffle would immediately emit elements as each bundle is committed.
   
   In my case it is streaming. I've written a pretty simple test:
   
   ```
   _LOGGER = logging.getLogger(__name__)
   
   def print_time(x):
     import datetime
     _LOGGER.warning(datetime.datetime.fromtimestamp(x))
   
   result = (p 
     | PeriodicImpulse(start_timestamp=time.time(), fire_interval=1.0)
     | beam.Reshuffle() # HERE
     | beam.Map(print_time))
   ```
   
   No output (warning log) seen in direct runner, portable runner and flink runner running locally; also tested on dataflow runner and the output can be seen.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #21591: Python Reshuffle holds elements

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #21591:
URL: https://github.com/apache/beam/issues/21591#issuecomment-1152523403

   Just to clarify: because you say "until the stage is finished" I think of batch processing, since stages do not finish in streaming. If it is batch, then this is expected behavior. In streaming, it is expected that a reshuffle would immediately emit elements as each bundle is committed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #21591: Python Reshuffle holds elements

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #21591:
URL: https://github.com/apache/beam/issues/21591#issuecomment-1157737099

   Yea, that was going to be my next guess. Since you said it impacted Flink I looked for other causes. But if it is just the Python direct runner, streaming support is most likely the issue. I think @robertwb and @pabloem are the people I would expect to know about what is expected to work and what is not implemented yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn closed issue #21591: Python Reshuffle holds elements

Posted by GitBox <gi...@apache.org>.
tvalentyn closed issue #21591: Python Reshuffle holds elements
URL: https://github.com/apache/beam/issues/21591


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on issue #21591: Python Reshuffle holds elements

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on issue #21591:
URL: https://github.com/apache/beam/issues/21591#issuecomment-1163219612

   Overall, I think Reshuffle itself semantically can hold element as @kennknowles mentions in https://github.com/apache/beam/issues/21591#issuecomment-1152523403. I suggest we close this as WAI and track runner improvements as necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #21591: Python Reshuffle holds elements

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #21591:
URL: https://github.com/apache/beam/issues/21591#issuecomment-1157023933

   I see that you also fixed #21606. That could be a source of this problem too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #21591: Python Reshuffle holds elements

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #21591:
URL: https://github.com/apache/beam/issues/21591#issuecomment-1151443967

   @Abacn will you be taking this one? Which runner are you discussing? I believe that the runner controls this, so it is not specific to the SDK. But if the Python SDK is putting in a trigger that does not cause the reshuffle, this could still be a runner thing since the runner should maybe be looking at the composite transform. I don't have current context on this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tvalentyn commented on issue #21591: Python Reshuffle holds elements

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on issue #21591:
URL: https://github.com/apache/beam/issues/21591#issuecomment-1163167770

   Default (FnApi) direct runner  doesn't fully support streaming https://issues.apache.org/jira/browse/BEAM-7514.
   Using `--runner BundleBasedDirectRunner --streaming ` also didn't work due to smth similar to: https://github.com/apache/beam/issues/21103


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org