You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dong Lin (Jira)" <ji...@apache.org> on 2023/03/21 03:06:00 UTC
[jira] [Assigned] (FLINK-31486) Using KeySelector in IterationBody causes ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dong Lin reassigned FLINK-31486:
--------------------------------
Assignee: Jiang Xin
> Using KeySelector in IterationBody causes ClassCastException
> ------------------------------------------------------------
>
> Key: FLINK-31486
> URL: https://issues.apache.org/jira/browse/FLINK-31486
> Project: Flink
> Issue Type: Bug
> Components: Library / Machine Learning
> Reporter: Jiang Xin
> Assignee: Jiang Xin
> Priority: Major
> Labels: pull-request-available
> Fix For: ml-2.2.0
>
>
> We have the following code which uses CoGroup along with KeySelector in an IterationBody. When we submit to Flink Session cluster, the exception raises.
> {code:java}
> public static void main(String[] args) throws Exception {
> Configuration config = new Configuration();
> config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 5000000L);
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
> env.setStateBackend(new EmbeddedRocksDBStateBackend());
> env.getConfig().enableObjectReuse();
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.setParallelism(1);
> env.getCheckpointConfig().disableCheckpointing();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> int num = 400;
> int types = num / 10;
> Random rand = new Random(0);
> long[] randoms = new long[types];
> for (int i = 0; i < types; i++) {
> randoms[i] = rand.nextInt(types);
> }
> SourceFunction<Row> rowGenerator =
> new SourceFunction<Row>() {
> @Override
> public final void run(SourceContext<Row> ctx) throws Exception {
> int cnt = 0;
> while (cnt < num) {
> ctx.collect(
> Row.of(
> randoms[cnt % (types)],
> randoms[cnt % (types)],
> new DenseVector(10)));
> cnt++;
> }
> }
> @Override
> public void cancel() {}
> };
> Table trainDataTable =
> tEnv.fromDataStream(
> env.addSource(rowGenerator, "sourceOp-" + 1)
> .returns(
> Types.ROW(
> Types.LONG,
> Types.LONG,
> DenseVectorTypeInfo.INSTANCE)));
> testCoGroupWithIteration(tEnv, trainDataTable);
> }
> public static void testCoGroupWithIteration(StreamTableEnvironment tEnv, Table trainDataTable)
> throws Exception {
> DataStream<Row> data1 = tEnv.toDataStream(trainDataTable);
> DataStream<Row> data2 = tEnv.toDataStream(trainDataTable);
> DataStreamList coResult =
> Iterations.iterateBoundedStreamsUntilTermination(
> DataStreamList.of(data1),
> ReplayableDataStreamList.notReplay(data2),
> IterationConfig.newBuilder().build(),
> new TrainIterationBody());
> List<Integer> counts = IteratorUtils.toList(coResult.get(0).executeAndCollect());
> System.out.println(counts.size());
> }
> private static class TrainIterationBody implements IterationBody {
> @Override
> public IterationBodyResult process(
> DataStreamList variableStreams, DataStreamList dataStreams) {
> DataStreamList feedbackVariableStream =
> IterationBody.forEachRound(
> dataStreams,
> input -> {
> DataStream<Row> dataStream1 = variableStreams.get(0);
> DataStream<Row> dataStream2 = dataStreams.get(0);
> DataStream<Row> coResult =
> dataStream1
> .coGroup(dataStream2)
> .where(
> (KeySelector<Row, Long>)
> t2 -> t2.getFieldAs(0))
> .equalTo(
> (KeySelector<Row, Long>)
> t2 -> t2.getFieldAs(1))
> .window(EndOfStreamWindows.get())
> .apply(
> new RichCoGroupFunction<Row, Row, Row>() {
> @Override
> public void coGroup(
> Iterable<Row> iterable,
> Iterable<Row> iterable1,
> Collector<Row> collector) {
> for (Row row : iterable1) {
> collector.collect(row);
> }
> }
> });
> return DataStreamList.of(coResult);
> });
> DataStream<Integer> terminationCriteria =
> feedbackVariableStream
> .get(0)
> .flatMap(new TerminateOnMaxIter(2))
> .returns(Types.INT);
> return new IterationBodyResult(
> feedbackVariableStream, feedbackVariableStream, terminationCriteria);
> }
> } {code}
> The exception is as below. Note that the exception can not be reproduced in the unittest with MiniCluster since all classes are in the Java classpath.
> {code:java}
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate state partitioner. at org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662) at org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96) at org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168) at org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146) at org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1 of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543) at org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659) ... 17 more {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)