You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Pablo Estrada (Jira)" <ji...@apache.org> on 2019/11/26 21:10:00 UTC

[jira] [Created] (BEAM-8833) Verify that SDF-related functionality is compatible with multiworker environment for FnApiRunner

Pablo Estrada created BEAM-8833:
-----------------------------------

             Summary: Verify that SDF-related functionality is compatible with multiworker environment for FnApiRunner
                 Key: BEAM-8833
                 URL: https://issues.apache.org/jira/browse/BEAM-8833
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
            Reporter: Pablo Estrada


 

The FnApiRunner currently avoids splitting deferred inputs for multiple workers: [https://github.com/apache/beam/blob/release-2.16.0/sdks/python/apache_beam/runners/portability/fn_api_runner.py#L771-L793]

 

These issues are being surfaced as I convert the FnApiRunner to work based on ready elements instead of executing stage per stage: [https://github.com/apache/beam/pull/10067]

We should verify that the work items coming back from parallel SDFs are being merged properly.

Symptoms that I'm seeing are duplication of element processing for SDF tests:
{code:java}
TEST: apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerSplitTestWithMultiWorkers.test_split_crazy_sdf
EXPECTED:
[
 (5, 0), (5, 1), (5, 2), (5, 3), (5, 4), 
 (9, 0), (9, 1), (9, 2), (9, 3), (9, 4), (9, 5), (9, 6), (9, 7), (9, 8), 
 (8, 0), (8, 1), (8, 2), (8, 3), (8, 4), (8, 5), (8, 6), (8, 7), 
 (8, 0), (8, 1), (8, 2), (8, 3), (8, 4), (8, 5), (8, 6), (8, 7), 
 (9, 0), (9, 1), (9, 2), (9, 3), (9, 4), (9, 5), (9, 6), (9, 7), (9, 8)] == 

ACTUAL OUTPUT:
[
 (8, 0), (8, 1), (8, 2), (8, 3), (8, 4), (8, 5), (8, 6), (8, 7), 
 (9, 0), (9, 1), (9, 2), (9, 3), (9, 4), (9, 5), (9, 6), (9, 7), (9, 8), 
 (8, 0), (8, 1), (8, 2), (8, 3), (8, 4), (8, 5), (8, 6), (8, 7), 
 (9, 0), (9, 1), (9, 2), (9, 3), (9, 4), (9, 7), (9, 8), (9, 5), (9, 6), 
 (5, 0), (5, 1), (5, 2), (5, 3), (5, 4),  
 (8, 0), (8, 1), (8, 2), (8, 3), (8, 4), (8, 5), (8, 6), (8, 7), 
 (9, 0), (9, 1), (9, 2), (9, 3), (9, 4), (9, 5), (9, 6), (9, 7), (9, 8)]
{code}
Some comments from Robert:

 
{code:java}
Or the _add_residuals_and_channel_splits_to_deferred_inputs method. Looks like it has side effects?Hmm.... looks like this code was added for the multi-worker case. (And the comments and the TODO are unrelated.)I think this is in reference to the fact that https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner.py#L1826 does not do the right thing yet, but I wonder how that's OK the first time around....there might be a bug lurking here. 
{code}
cc: [~boyuanz], [~robertwb]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)