You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Zhipeng Zhang (Jira)" <ji...@apache.org> on 2023/02/28 08:16:00 UTC
[jira] [Created] (FLINK-31255) OperatorUtils#createWrappedOperatorConfig fails to wrap operator config
Zhipeng Zhang created FLINK-31255:
-------------------------------------
Summary: OperatorUtils#createWrappedOperatorConfig fails to wrap operator config
Key: FLINK-31255
URL: https://issues.apache.org/jira/browse/FLINK-31255
Project: Flink
Issue Type: Bug
Components: Library / Machine Learning
Affects Versions: ml-2.1.0, ml-2.0.0, ml-2.2.0
Reporter: Zhipeng Zhang
Currently we use operator wrapper to enable using normal operators in iterations. However, teh operatorConfig is not correctly unwrapped. For example, the following code fails because of wrong type serializer.
{code:java}
@Test
public void testIterationWithMapPartition() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> input =
env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), Types.LONG);
DataStreamList result =
Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(input),
ReplayableDataStreamList.notReplay(input),
IterationConfig.newBuilder()
.setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND)
.build(),
new IterationBodyWithMapPartition());
List<Integer> counts = IteratorUtils.toList(result.get(0).executeAndCollect());
System.out.println(counts.size());
}
private static class IterationBodyWithMapPartition implements IterationBody {
@Override
public IterationBodyResult process(
DataStreamList variableStreams, DataStreamList dataStreams) {
DataStream<Long> input = variableStreams.get(0);
DataStream<Long> mapPartitionResult =
DataStreamUtils.mapPartition(
input,
new MapPartitionFunction <Long, Long>() {
@Override
public void mapPartition(Iterable <Long> iterable, Collector <Long> collector)
throws Exception {
for (Long iter: iterable) {
collector.collect(iter);
}
}
});
DataStream<Integer> terminationCriteria =
mapPartitionResult.<Long>flatMap(new TerminateOnMaxIter(2)).returns(Types.INT);
return new IterationBodyResult(
DataStreamList.of(mapPartitionResult), variableStreams, terminationCriteria);
}
} {code}
The error stack is:
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.flink.iteration.IterationRecord
at org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34)
at org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79)
at org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107)
at org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148)
at org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445)
at org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)