You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marco Villalobos <mv...@kineteque.com> on 2022/08/17 22:08:33 UTC
Is this a Batch SQL Bug?
Given this program:
```java
package mvillalobos.bug;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class IsThisABatchSQLBug {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
" `file.path` STRING NOT NULL METADATA,\n" +
" `file.name` STRING NOT NULL METADATA,\n" +
" `file.size` BIGINT NOT NULL METADATA,\n" +
" `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA,\n" +
" line STRING\n" +
" ) WITH (\n" +
" 'connector' = 'filesystem', \n" +
" 'format' = 'raw'\n" +
" );");
tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
" WITH (\n" +
" 'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n" +
" ) LIKE historical_raw_source_template;");
final TableResult output = tableEnv.from("historical_raw_source").select($("line")).execute();
output.print();
}
}
```
and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' directory:
```text
one
two
three
four
five
six
seven
eight
nine
ten
```
The print results are:
```text
+----+--------------------------------+
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
+----+--------------------------------+
10 rows in set
```
I was expecting all rows to print. If this is not a bug, then what am I misunderstanding?
I do noticre that the transient field:
private transient RecordCollector collector;
in org.apache.flink.connector.file.table.DeserializationSchemaAdapter.LineBytesInputFormat
becomes empty on each iteration, as though it failed to serialize correctly.
Regardless, I don't know what's wrong. Any advice would deeply help.
Marco A. Villalobos
Re: Is this a Batch SQL Bug?
Posted by Marco Villalobos <mv...@kineteque.com>.
Sure. I created this ticket. First time creating a ticket.
https://issues.apache.org/jira/browse/FLINK-29039
> On Aug 17, 2022, at 6:49 PM, yuxia <lu...@alumni.sjtu.edu.cn> wrote:
>
> Thanks for raising it. Yes, you're right. It's indeed a bug.
> The problem is the RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only do shallow copy of the produced data, so that the finnal result will always be the last row value.
>
> Could you please help create a jira to track it?
>
> Best regards,
> Yuxia
>
> ----- 原始邮件 -----
> 发件人: "Marco Villalobos" <mv...@kineteque.com>
> 收件人: "User" <us...@flink.apache.org>
> 发送时间: 星期四, 2022年 8 月 18日 上午 6:08:33
> 主题: Is this a Batch SQL Bug?
>
> Given this program:
>
> ```java
> package mvillalobos.bug;
>
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
> import static org.apache.flink.table.api.Expressions.$;
>
> public class IsThisABatchSQLBug {
>
> public static void main(String[] args) {
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
> " `file.path` STRING NOT NULL METADATA,\n" +
> " `file.name` STRING NOT NULL METADATA,\n" +
> " `file.size` BIGINT NOT NULL METADATA,\n" +
> " `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA,\n" +
> " line STRING\n" +
> " ) WITH (\n" +
> " 'connector' = 'filesystem', \n" +
> " 'format' = 'raw'\n" +
> " );");
> tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
> " WITH (\n" +
> " 'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n" +
> " ) LIKE historical_raw_source_template;");
>
> final TableResult output = tableEnv.from("historical_raw_source").select($("line")).execute();
> output.print();
> }
> }
> ```
>
> and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' directory:
>
> ```text
> one
> two
> three
> four
> five
> six
> seven
> eight
> nine
> ten
> ```
>
> The print results are:
> ```text
> +----+--------------------------------+
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> +----+--------------------------------+
> 10 rows in set
> ```
>
> I was expecting all rows to print. If this is not a bug, then what am I misunderstanding?
>
> I do noticre that the transient field:
>
> private transient RecordCollector collector;
>
> in org.apache.flink.connector.file.table.DeserializationSchemaAdapter.LineBytesInputFormat
>
> becomes empty on each iteration, as though it failed to serialize correctly.
>
> Regardless, I don't know what's wrong. Any advice would deeply help.
>
> Marco A. Villalobos
Re: Is this a Batch SQL Bug?
Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Thanks for raising it. Yes, you're right. It's indeed a bug.
The problem is the RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only do shallow copy of the produced data, so that the finnal result will always be the last row value.
Could you please help create a jira to track it?
Best regards,
Yuxia
----- 原始邮件 -----
发件人: "Marco Villalobos" <mv...@kineteque.com>
收件人: "User" <us...@flink.apache.org>
发送时间: 星期四, 2022年 8 月 18日 上午 6:08:33
主题: Is this a Batch SQL Bug?
Given this program:
```java
package mvillalobos.bug;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class IsThisABatchSQLBug {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
" `file.path` STRING NOT NULL METADATA,\n" +
" `file.name` STRING NOT NULL METADATA,\n" +
" `file.size` BIGINT NOT NULL METADATA,\n" +
" `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA,\n" +
" line STRING\n" +
" ) WITH (\n" +
" 'connector' = 'filesystem', \n" +
" 'format' = 'raw'\n" +
" );");
tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
" WITH (\n" +
" 'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n" +
" ) LIKE historical_raw_source_template;");
final TableResult output = tableEnv.from("historical_raw_source").select($("line")).execute();
output.print();
}
}
```
and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' directory:
```text
one
two
three
four
five
six
seven
eight
nine
ten
```
The print results are:
```text
+----+--------------------------------+
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
+----+--------------------------------+
10 rows in set
```
I was expecting all rows to print. If this is not a bug, then what am I misunderstanding?
I do noticre that the transient field:
private transient RecordCollector collector;
in org.apache.flink.connector.file.table.DeserializationSchemaAdapter.LineBytesInputFormat
becomes empty on each iteration, as though it failed to serialize correctly.
Regardless, I don't know what's wrong. Any advice would deeply help.
Marco A. Villalobos