You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jiang Xin (Jira)" <ji...@apache.org> on 2023/03/03 05:54:00 UTC

[jira] [Updated] (FLINK-31312) EnableObjectReuse cause different behaviors

     [ https://issues.apache.org/jira/browse/FLINK-31312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jiang Xin updated FLINK-31312:
------------------------------
    Description: 
I have the following test code which fails with the exception `Accessing a field by name is not supported in position-based field mode`, however, if I remove the `enableObjectReuse`, it works.
{code:java}
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // The test fails with enableObjectReuse
    env.getConfig().enableObjectReuse();

    final SourceFunction<Row> rowGenerator =
            new SourceFunction<Row>() {
                @Override
                public final void run(SourceContext<Row> ctx) throws Exception {
                    Row row = new Row(1);
                    row.setField(0, "a");
                    ctx.collect(row);
                }

                @Override
                public void cancel() {}
            };

    final RowTypeInfo typeInfo =
            new RowTypeInfo(new TypeInformation[] {Types.STRING}, new String[] {"col1"});

    DataStream<Row> dataStream = env.addSource(rowGenerator, typeInfo);

    DataStream<Row> transformedDataStream =
            dataStream.map(
                    (MapFunction<Row, Row>) value -> Row.of(value.getField("col1")), typeInfo);

    transformedDataStream.addSink(new PrintSinkFunction<>());
    env.execute("Mini Test");
} {code}
The `SourceFunction` generates rows without field names, but the return type info is assigned by `env.addSource(rowGenerator, typeInfo)`.

With object-reuse enabled, rows would be passed to the MapFunction directly, so the exception raises. While if the object-reuse is disabled,  rows would be reconstructed and given field names when passing to the next operator and the test works well.

  was:
I have the following test code which works well, however, if I remove the `enableObjectReuse`, the test case would fail with the exception `Accessing a field by name is not supported in position-based field mode`.
{code:java}
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // The test fails with enableObjectReuse
    env.getConfig().enableObjectReuse();

    final SourceFunction<Row> rowGenerator =
            new SourceFunction<Row>() {
                @Override
                public final void run(SourceContext<Row> ctx) throws Exception {
                    Row row = new Row(1);
                    row.setField(0, "a");
                    ctx.collect(row);
                }

                @Override
                public void cancel() {}
            };

    final RowTypeInfo typeInfo =
            new RowTypeInfo(new TypeInformation[] {Types.STRING}, new String[] {"col1"});

    DataStream<Row> dataStream = env.addSource(rowGenerator, typeInfo);

    DataStream<Row> transformedDataStream =
            dataStream.map(
                    (MapFunction<Row, Row>) value -> Row.of(value.getField("col1")), typeInfo);

    transformedDataStream.addSink(new PrintSinkFunction<>());
    env.execute("Mini Test");
} {code}
The `SourceFunction` generates rows without field names, but the return type info is assigned by `env.addSource(rowGenerator, typeInfo)`.

With object-reuse enabled, rows would be passed to the mapFunction directly, so the exception raises. While if the object-reuse is disabled,  rows would be reconstructed and given field names when passing to the next operator, so the test case works well.


> EnableObjectReuse cause different behaviors
> -------------------------------------------
>
>                 Key: FLINK-31312
>                 URL: https://issues.apache.org/jira/browse/FLINK-31312
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>            Reporter: Jiang Xin
>            Priority: Major
>
> I have the following test code which fails with the exception `Accessing a field by name is not supported in position-based field mode`, however, if I remove the `enableObjectReuse`, it works.
> {code:java}
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>     // The test fails with enableObjectReuse
>     env.getConfig().enableObjectReuse();
>     final SourceFunction<Row> rowGenerator =
>             new SourceFunction<Row>() {
>                 @Override
>                 public final void run(SourceContext<Row> ctx) throws Exception {
>                     Row row = new Row(1);
>                     row.setField(0, "a");
>                     ctx.collect(row);
>                 }
>                 @Override
>                 public void cancel() {}
>             };
>     final RowTypeInfo typeInfo =
>             new RowTypeInfo(new TypeInformation[] {Types.STRING}, new String[] {"col1"});
>     DataStream<Row> dataStream = env.addSource(rowGenerator, typeInfo);
>     DataStream<Row> transformedDataStream =
>             dataStream.map(
>                     (MapFunction<Row, Row>) value -> Row.of(value.getField("col1")), typeInfo);
>     transformedDataStream.addSink(new PrintSinkFunction<>());
>     env.execute("Mini Test");
> } {code}
> The `SourceFunction` generates rows without field names, but the return type info is assigned by `env.addSource(rowGenerator, typeInfo)`.
> With object-reuse enabled, rows would be passed to the MapFunction directly, so the exception raises. While if the object-reuse is disabled,  rows would be reconstructed and given field names when passing to the next operator and the test works well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)