You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dong Lin (Jira)" <ji...@apache.org> on 2023/03/21 03:06:00 UTC

[jira] [Assigned] (FLINK-31486) Using KeySelector in IterationBody causes ClassCastException

     [ https://issues.apache.org/jira/browse/FLINK-31486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dong Lin reassigned FLINK-31486:
--------------------------------

    Assignee: Jiang Xin

> Using KeySelector in IterationBody causes ClassCastException
> ------------------------------------------------------------
>
>                 Key: FLINK-31486
>                 URL: https://issues.apache.org/jira/browse/FLINK-31486
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>            Reporter: Jiang Xin
>            Assignee: Jiang Xin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: ml-2.2.0
>
>
> We have the following code which uses CoGroup along with KeySelector in an IterationBody. When we submit to Flink Session cluster, the exception raises.
> {code:java}
> public static void main(String[] args) throws Exception {
>     Configuration config = new Configuration();
>     config.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 5000000L);
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
>     env.setStateBackend(new EmbeddedRocksDBStateBackend());
>     env.getConfig().enableObjectReuse();
>     env.setRestartStrategy(RestartStrategies.noRestart());
>     env.setParallelism(1);
>     env.getCheckpointConfig().disableCheckpointing();
>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>     int num = 400;
>     int types = num / 10;
>     Random rand = new Random(0);
>     long[] randoms = new long[types];
>     for (int i = 0; i < types; i++) {
>         randoms[i] = rand.nextInt(types);
>     }
>     SourceFunction<Row> rowGenerator =
>             new SourceFunction<Row>() {
>                 @Override
>                 public final void run(SourceContext<Row> ctx) throws Exception {
>                     int cnt = 0;
>                     while (cnt < num) {
>                         ctx.collect(
>                                 Row.of(
>                                         randoms[cnt % (types)],
>                                         randoms[cnt % (types)],
>                                         new DenseVector(10)));
>                         cnt++;
>                     }
>                 }
>                 @Override
>                 public void cancel() {}
>             };
>     Table trainDataTable =
>             tEnv.fromDataStream(
>                     env.addSource(rowGenerator, "sourceOp-" + 1)
>                             .returns(
>                                     Types.ROW(
>                                             Types.LONG,
>                                             Types.LONG,
>                                             DenseVectorTypeInfo.INSTANCE)));
>     testCoGroupWithIteration(tEnv, trainDataTable);
> }
> public static void testCoGroupWithIteration(StreamTableEnvironment tEnv, Table trainDataTable)
>         throws Exception {
>     DataStream<Row> data1 = tEnv.toDataStream(trainDataTable);
>     DataStream<Row> data2 = tEnv.toDataStream(trainDataTable);
>     DataStreamList coResult =
>             Iterations.iterateBoundedStreamsUntilTermination(
>                     DataStreamList.of(data1),
>                     ReplayableDataStreamList.notReplay(data2),
>                     IterationConfig.newBuilder().build(),
>                     new TrainIterationBody());
>     List<Integer> counts = IteratorUtils.toList(coResult.get(0).executeAndCollect());
>     System.out.println(counts.size());
> }
> private static class TrainIterationBody implements IterationBody {
>     @Override
>     public IterationBodyResult process(
>             DataStreamList variableStreams, DataStreamList dataStreams) {
>         DataStreamList feedbackVariableStream =
>                 IterationBody.forEachRound(
>                         dataStreams,
>                         input -> {
>                             DataStream<Row> dataStream1 = variableStreams.get(0);
>                             DataStream<Row> dataStream2 = dataStreams.get(0);
>                             DataStream<Row> coResult =
>                                     dataStream1
>                                             .coGroup(dataStream2)
>                                             .where(
>                                                     (KeySelector<Row, Long>)
>                                                             t2 -> t2.getFieldAs(0))
>                                             .equalTo(
>                                                     (KeySelector<Row, Long>)
>                                                             t2 -> t2.getFieldAs(1))
>                                             .window(EndOfStreamWindows.get())
>                                             .apply(
>                                                     new RichCoGroupFunction<Row, Row, Row>() {
>                                                         @Override
>                                                         public void coGroup(
>                                                                 Iterable<Row> iterable,
>                                                                 Iterable<Row> iterable1,
>                                                                 Collector<Row> collector) {
>                                                             for (Row row : iterable1) {
>                                                                 collector.collect(row);
>                                                             }
>                                                         }
>                                                     });
>                             return DataStreamList.of(coResult);
>                         });
>         DataStream<Integer> terminationCriteria =
>                 feedbackVariableStream
>                         .get(0)
>                         .flatMap(new TerminateOnMaxIter(2))
>                         .returns(Types.INT);
>         return new IterationBodyResult(
>                 feedbackVariableStream, feedbackVariableStream, terminationCriteria);
>     }
> } {code}
> The exception is as below. Note that the exception can not be reproduced in the unittest with MiniCluster since all classes are in the Java classpath.
> {code:java}
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate state partitioner. at org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:662) at org.apache.flink.iteration.operator.OperatorUtils.createWrappedOperatorConfig(OperatorUtils.java:96) at org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:168) at org.apache.flink.iteration.operator.perround.AbstractPerRoundWrapperOperator.getWrappedOperator(AbstractPerRoundWrapperOperator.java:146) at org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:68) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) 
> Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector.keySelector1 of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionKeySelector at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2302) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1432) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2409) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2403) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2327) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1665) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:501) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:459) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543) at org.apache.flink.streaming.api.graph.StreamConfig.getStatePartitioner(StreamConfig.java:659) ... 17 more  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)