You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2022/06/23 03:39:00 UTC
[jira] [Assigned] (FLINK-28126) Iteration gets stuck when replayable datastream and its downstream operator have different parallelism
[ https://issues.apache.org/jira/browse/FLINK-28126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Gao reassigned FLINK-28126:
-------------------------------
Assignee: Yun Gao
> Iteration gets stuck when replayable datastream and its downstream operator have different parallelism
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-28126
> URL: https://issues.apache.org/jira/browse/FLINK-28126
> Project: Flink
> Issue Type: Bug
> Components: Library / Machine Learning
> Affects Versions: ml-2.0.0
> Reporter: Xuannan Su
> Assignee: Yun Gao
> Priority: Major
>
> Iteration gets stuck when replayable datastream and its downstream operator have different parallelism. It can be reproduced with the following code snippet.
> {code:java}
> @Test
> public void testIteration() throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> final SingleOutputStreamOperator<Integer> variable = env.fromElements(0).name("i");
> final SingleOutputStreamOperator<Integer> data = env.fromElements(1, 2).name("inc")
> .map(x -> x).setParallelism(1); // test can pass if parallelism is 2.
> final IterationConfig config = IterationConfig.newBuilder().build();
> Iterations.iterateBoundedStreamsUntilTermination(
> DataStreamList.of(variable),
> ReplayableDataStreamList.replay(data),
> config,
> (IterationBody) (variableStreams, dataStreams) -> {
> final DataStream<Integer> sample = dataStreams.get(0);
> final SingleOutputStreamOperator<Integer> trainOutput =
> sample
> .transform(
> "iter",
> TypeInformation.of(Integer.class),
> new IterTransform())
> .setParallelism(2)
> .map((MapFunction<Integer, Integer>) integer -> integer)
> .setParallelism(1);
> return new IterationBodyResult(
> DataStreamList.of(trainOutput), DataStreamList.of(trainOutput));
> });
> env.execute();
> }
> public static class IterTransform extends AbstractStreamOperator<Integer>
> implements OneInputStreamOperator<Integer, Integer>, IterationListener<Integer> {
> @Override
> public void processElement(StreamRecord<Integer> element) throws Exception {
> LOG.info("Processing element: {}", element);
> }
> @Override
> public void onEpochWatermarkIncremented(
> int epochWatermark, Context context, Collector<Integer> collector)
> throws Exception {
> LOG.info("onEpochWatermarkIncremented: {}", epochWatermark);
> if (epochWatermark >= 10) {
> return;
> }
> collector.collect(0);
> }
> @Override
> public void onIterationTerminated(Context context, Collector<Integer> collector)
> throws Exception {
> LOG.info("onIterationTerminated");
> }
> }
> {code}
> After digging into the code, I found that the `ReplayOperator` doesn't emit the epoch watermark with a broadcast output. [~gaoyunhaii], could you look to see if this is the case?
--
This message was sent by Atlassian Jira
(v8.20.7#820007)