You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dong Lin (Jira)" <ji...@apache.org> on 2023/04/18 05:47:00 UTC
[jira] [Updated] (FLINK-31255) OperatorUtils should update input and sideOutput serializers
[ https://issues.apache.org/jira/browse/FLINK-31255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dong Lin updated FLINK-31255:
-----------------------------
Summary: OperatorUtils should update input and sideOutput serializers (was: OperatorUtils#createWrappedOperatorConfig fails to wrap operator config)
> OperatorUtils should update input and sideOutput serializers
> ------------------------------------------------------------
>
> Key: FLINK-31255
> URL: https://issues.apache.org/jira/browse/FLINK-31255
> Project: Flink
> Issue Type: Bug
> Components: Library / Machine Learning
> Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0
> Reporter: Zhipeng Zhang
> Priority: Major
> Labels: pull-request-available
>
> Currently we use operator wrapper to enable using normal operators in iterations. However, the operatorConfig is not correctly unwrapped. For example, the following code fails because of wrong type serializer.
>
> {code:java}
> @Test
> public void testIterationWithMapPartition() throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<Long> input =
> env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), Types.LONG);
> DataStreamList result =
> Iterations.iterateBoundedStreamsUntilTermination(
> DataStreamList.of(input),
> ReplayableDataStreamList.notReplay(input),
> IterationConfig.newBuilder()
> .setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND)
> .build(),
> new IterationBodyWithMapPartition());
> List<Integer> counts = IteratorUtils.toList(result.get(0).executeAndCollect());
> System.out.println(counts.size());
> }
> private static class IterationBodyWithMapPartition implements IterationBody {
> @Override
> public IterationBodyResult process(
> DataStreamList variableStreams, DataStreamList dataStreams) {
> DataStream<Long> input = variableStreams.get(0);
> DataStream<Long> mapPartitionResult =
> DataStreamUtils.mapPartition(
> input,
> new MapPartitionFunction <Long, Long>() {
> @Override
> public void mapPartition(Iterable <Long> iterable, Collector <Long> collector)
> throws Exception {
> for (Long iter: iterable) {
> collector.collect(iter);
> }
> }
> });
> DataStream<Integer> terminationCriteria =
> mapPartitionResult.<Long>flatMap(new TerminateOnMaxIter(2)).returns(Types.INT);
> return new IterationBodyResult(
> DataStreamList.of(mapPartitionResult), variableStreams, terminationCriteria);
> }
> } {code}
> The error stack is:
> Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.flink.iteration.IterationRecord
> at org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34)
> at org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79)
> at org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107)
> at org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148)
> at org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445)
> at org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)