You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Zhipeng Zhang (Jira)" <ji...@apache.org> on 2023/02/28 08:16:00 UTC

[jira] [Created] (FLINK-31255) OperatorUtils#createWrappedOperatorConfig fails to wrap operator config

Zhipeng Zhang created FLINK-31255:
-------------------------------------

             Summary: OperatorUtils#createWrappedOperatorConfig fails to wrap operator config
                 Key: FLINK-31255
                 URL: https://issues.apache.org/jira/browse/FLINK-31255
             Project: Flink
          Issue Type: Bug
          Components: Library / Machine Learning
    Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0
            Reporter: Zhipeng Zhang


Currently we use operator wrapper to enable using normal operators in iterations. However, teh operatorConfig is not correctly unwrapped. For example, the following code fails because of wrong type serializer.

 
{code:java}
@Test
public void testIterationWithMapPartition() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Long> input =
        env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), Types.LONG);
    DataStreamList result =
        Iterations.iterateBoundedStreamsUntilTermination(
            DataStreamList.of(input),
            ReplayableDataStreamList.notReplay(input),
            IterationConfig.newBuilder()
                .setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND)
                .build(),
            new IterationBodyWithMapPartition());

    List<Integer> counts = IteratorUtils.toList(result.get(0).executeAndCollect());
    System.out.println(counts.size());
}

private static class IterationBodyWithMapPartition implements IterationBody {

    @Override
    public IterationBodyResult process(
        DataStreamList variableStreams, DataStreamList dataStreams) {
        DataStream<Long> input = variableStreams.get(0);

        DataStream<Long> mapPartitionResult =
            DataStreamUtils.mapPartition(
                input,
                new MapPartitionFunction <Long, Long>() {
                    @Override
                    public void mapPartition(Iterable <Long> iterable, Collector <Long> collector)
                        throws Exception {
                        for (Long iter: iterable) {
                            collector.collect(iter);
                        }
                    }
                });

        DataStream<Integer> terminationCriteria =
            mapPartitionResult.<Long>flatMap(new TerminateOnMaxIter(2)).returns(Types.INT);

        return new IterationBodyResult(
            DataStreamList.of(mapPartitionResult), variableStreams, terminationCriteria);
    }
} {code}
The error stack is:

Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.flink.iteration.IterationRecord
    at org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34)
    at org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79)
    at org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107)
    at org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148)
    at org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445)
    at org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)