You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jiang Xin (Jira)" <ji...@apache.org> on 2023/03/03 05:54:00 UTC
[jira] [Updated] (FLINK-31312) EnableObjectReuse cause different behaviors
[ https://issues.apache.org/jira/browse/FLINK-31312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiang Xin updated FLINK-31312:
------------------------------
Description:
I have the following test code which fails with the exception `Accessing a field by name is not supported in position-based field mode`, however, if I remove the `enableObjectReuse`, it works.
{code:java}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// The test fails with enableObjectReuse
env.getConfig().enableObjectReuse();
final SourceFunction<Row> rowGenerator =
new SourceFunction<Row>() {
@Override
public final void run(SourceContext<Row> ctx) throws Exception {
Row row = new Row(1);
row.setField(0, "a");
ctx.collect(row);
}
@Override
public void cancel() {}
};
final RowTypeInfo typeInfo =
new RowTypeInfo(new TypeInformation[] {Types.STRING}, new String[] {"col1"});
DataStream<Row> dataStream = env.addSource(rowGenerator, typeInfo);
DataStream<Row> transformedDataStream =
dataStream.map(
(MapFunction<Row, Row>) value -> Row.of(value.getField("col1")), typeInfo);
transformedDataStream.addSink(new PrintSinkFunction<>());
env.execute("Mini Test");
} {code}
The `SourceFunction` generates rows without field names, but the return type info is assigned by `env.addSource(rowGenerator, typeInfo)`.
With object-reuse enabled, rows would be passed to the MapFunction directly, so the exception raises. While if the object-reuse is disabled, rows would be reconstructed and given field names when passing to the next operator and the test works well.
was:
I have the following test code which works well, however, if I remove the `enableObjectReuse`, the test case would fail with the exception `Accessing a field by name is not supported in position-based field mode`.
{code:java}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// The test fails with enableObjectReuse
env.getConfig().enableObjectReuse();
final SourceFunction<Row> rowGenerator =
new SourceFunction<Row>() {
@Override
public final void run(SourceContext<Row> ctx) throws Exception {
Row row = new Row(1);
row.setField(0, "a");
ctx.collect(row);
}
@Override
public void cancel() {}
};
final RowTypeInfo typeInfo =
new RowTypeInfo(new TypeInformation[] {Types.STRING}, new String[] {"col1"});
DataStream<Row> dataStream = env.addSource(rowGenerator, typeInfo);
DataStream<Row> transformedDataStream =
dataStream.map(
(MapFunction<Row, Row>) value -> Row.of(value.getField("col1")), typeInfo);
transformedDataStream.addSink(new PrintSinkFunction<>());
env.execute("Mini Test");
} {code}
The `SourceFunction` generates rows without field names, but the return type info is assigned by `env.addSource(rowGenerator, typeInfo)`.
With object-reuse enabled, rows would be passed to the mapFunction directly, so the exception raises. While if the object-reuse is disabled, rows would be reconstructed and given field names when passing to the next operator, so the test case works well.
> EnableObjectReuse cause different behaviors
> -------------------------------------------
>
> Key: FLINK-31312
> URL: https://issues.apache.org/jira/browse/FLINK-31312
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Reporter: Jiang Xin
> Priority: Major
>
> I have the following test code which fails with the exception `Accessing a field by name is not supported in position-based field mode`, however, if I remove the `enableObjectReuse`, it works.
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> // The test fails with enableObjectReuse
> env.getConfig().enableObjectReuse();
> final SourceFunction<Row> rowGenerator =
> new SourceFunction<Row>() {
> @Override
> public final void run(SourceContext<Row> ctx) throws Exception {
> Row row = new Row(1);
> row.setField(0, "a");
> ctx.collect(row);
> }
> @Override
> public void cancel() {}
> };
> final RowTypeInfo typeInfo =
> new RowTypeInfo(new TypeInformation[] {Types.STRING}, new String[] {"col1"});
> DataStream<Row> dataStream = env.addSource(rowGenerator, typeInfo);
> DataStream<Row> transformedDataStream =
> dataStream.map(
> (MapFunction<Row, Row>) value -> Row.of(value.getField("col1")), typeInfo);
> transformedDataStream.addSink(new PrintSinkFunction<>());
> env.execute("Mini Test");
> } {code}
> The `SourceFunction` generates rows without field names, but the return type info is assigned by `env.addSource(rowGenerator, typeInfo)`.
> With object-reuse enabled, rows would be passed to the MapFunction directly, so the exception raises. While if the object-reuse is disabled, rows would be reconstructed and given field names when passing to the next operator and the test works well.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)