You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dong Lin (Jira)" <ji...@apache.org> on 2023/04/18 05:49:00 UTC

[jira] [Assigned] (FLINK-31255) OperatorUtils#createWrappedOperatorConfig should update input and sideOutput serializers

     [ https://issues.apache.org/jira/browse/FLINK-31255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dong Lin reassigned FLINK-31255:
--------------------------------

    Assignee: Jiang Xin

> OperatorUtils#createWrappedOperatorConfig should update input and sideOutput serializers
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-31255
>                 URL: https://issues.apache.org/jira/browse/FLINK-31255
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>    Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
>            Reporter: Zhipeng Zhang
>            Assignee: Jiang Xin
>            Priority: Major
>              Labels: pull-request-available
>
> Currently we use operator wrapper to enable using normal operators in iterations. However, the operatorConfig is not correctly unwrapped. For example, the following code fails because of wrong type serializer.
>  
> {code:java}
> @Test
> public void testIterationWithMapPartition() throws Exception {
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     DataStream<Long> input =
>         env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), Types.LONG);
>     DataStreamList result =
>         Iterations.iterateBoundedStreamsUntilTermination(
>             DataStreamList.of(input),
>             ReplayableDataStreamList.notReplay(input),
>             IterationConfig.newBuilder()
>                 .setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND)
>                 .build(),
>             new IterationBodyWithMapPartition());
>     List<Integer> counts = IteratorUtils.toList(result.get(0).executeAndCollect());
>     System.out.println(counts.size());
> }
> private static class IterationBodyWithMapPartition implements IterationBody {
>     @Override
>     public IterationBodyResult process(
>         DataStreamList variableStreams, DataStreamList dataStreams) {
>         DataStream<Long> input = variableStreams.get(0);
>         DataStream<Long> mapPartitionResult =
>             DataStreamUtils.mapPartition(
>                 input,
>                 new MapPartitionFunction <Long, Long>() {
>                     @Override
>                     public void mapPartition(Iterable <Long> iterable, Collector <Long> collector)
>                         throws Exception {
>                         for (Long iter: iterable) {
>                             collector.collect(iter);
>                         }
>                     }
>                 });
>         DataStream<Integer> terminationCriteria =
>             mapPartitionResult.<Long>flatMap(new TerminateOnMaxIter(2)).returns(Types.INT);
>         return new IterationBodyResult(
>             DataStreamList.of(mapPartitionResult), variableStreams, terminationCriteria);
>     }
> } {code}
> The error stack is:
> Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.flink.iteration.IterationRecord
>     at org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34)
>     at org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79)
>     at org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107)
>     at org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148)
>     at org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445)
>     at org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69)
>     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>     at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)