You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2022/06/23 03:39:00 UTC

[jira] [Assigned] (FLINK-28126) Iteration gets stuck when replayable datastream and its downstream operator have different parallelism

     [ https://issues.apache.org/jira/browse/FLINK-28126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yun Gao reassigned FLINK-28126:
-------------------------------

    Assignee: Yun Gao

> Iteration gets stuck when replayable datastream and its downstream operator have different parallelism
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28126
>                 URL: https://issues.apache.org/jira/browse/FLINK-28126
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>    Affects Versions: ml-2.0.0
>            Reporter: Xuannan Su
>            Assignee: Yun Gao
>            Priority: Major
>
> Iteration gets stuck when replayable datastream and its downstream operator have different parallelism. It can be reproduced with the following code snippet.
> {code:java}
>     @Test
>     public void testIteration() throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(1);
>         final SingleOutputStreamOperator<Integer> variable = env.fromElements(0).name("i");
>         final SingleOutputStreamOperator<Integer> data = env.fromElements(1, 2).name("inc")
>                 .map(x -> x).setParallelism(1); // test can pass if parallelism is 2.
>         final IterationConfig config = IterationConfig.newBuilder().build();
>         Iterations.iterateBoundedStreamsUntilTermination(
>                 DataStreamList.of(variable),
>                 ReplayableDataStreamList.replay(data),
>                 config,
>                 (IterationBody) (variableStreams, dataStreams) -> {
>                     final DataStream<Integer> sample = dataStreams.get(0);
>                     final SingleOutputStreamOperator<Integer> trainOutput =
>                             sample
>                                     .transform(
>                                             "iter",
>                                             TypeInformation.of(Integer.class),
>                                             new IterTransform())
>                                     .setParallelism(2)
>                                     .map((MapFunction<Integer, Integer>) integer -> integer)
>                                     .setParallelism(1);
>                     return new IterationBodyResult(
>                             DataStreamList.of(trainOutput), DataStreamList.of(trainOutput));
>                 });
>         env.execute();
>     }
>     public static class IterTransform extends AbstractStreamOperator<Integer>
>             implements OneInputStreamOperator<Integer, Integer>, IterationListener<Integer> {
>         @Override
>         public void processElement(StreamRecord<Integer> element) throws Exception {
>             LOG.info("Processing element: {}", element);
>         }
>         @Override
>         public void onEpochWatermarkIncremented(
>                 int epochWatermark, Context context, Collector<Integer> collector)
>                 throws Exception {
>             LOG.info("onEpochWatermarkIncremented: {}", epochWatermark);
>             if (epochWatermark >= 10) {
>                 return;
>             }
>             collector.collect(0);
>         }
>         @Override
>         public void onIterationTerminated(Context context, Collector<Integer> collector)
>                 throws Exception {
>             LOG.info("onIterationTerminated");
>         }
>     }
> {code}
> After digging into the code, I found that the `ReplayOperator` doesn't emit the epoch watermark with a broadcast output. [~gaoyunhaii], could you look to see if this is the case?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)