You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Marco A. Villalobos (Jira)" <ji...@apache.org> on 2022/08/19 02:57:00 UTC

[jira] [Created] (FLINK-29039) RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row value

Marco A. Villalobos created FLINK-29039:
-------------------------------------------

             Summary: RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row value
                 Key: FLINK-29039
                 URL: https://issues.apache.org/jira/browse/FLINK-29039
             Project: Flink
          Issue Type: Bug
          Components: Connectors / FileSystem
    Affects Versions: 1.15.1
         Environment: This issue was discovered on MacOS Big Sur.
            Reporter: Marco A. Villalobos


RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row value.

 

Given this program:

{{```java}}
{{{}package mvillalobos.bug;{}}}{{{}import org.apache.flink.api.common.RuntimeExecutionMode;{}}}
{{import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;}}
{{import org.apache.flink.table.api.TableResult;}}
{{{}import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;{}}}{{{}import static org.apache.flink.table.api.Expressions.$;{}}}{{{}public class IsThisABatchSQLBug {{}}}{{  public static void main(String[] args) {}}
{{     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();}}
{{     env.setRuntimeMode(RuntimeExecutionMode.BATCH);}}
{{     final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);}}
{{     tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +}}
{{           "        `file.path`              STRING NOT NULL METADATA,\n" +}}
{{           "        `file.name`              STRING NOT NULL METADATA,\n" +}}
{{           "        `file.size`              BIGINT NOT NULL METADATA,\n" +}}
{{           "        `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA,\n" +}}
{{           "        line                    STRING\n" +}}
{{           "      ) WITH (\n" +}}
{{           "        'connector' = 'filesystem', \n" +}}
{{           "        'format' = 'raw'\n" +}}
{{           "      );");}}
{{     tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +}}
{{           "      WITH (\n" +}}
{{           "        'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n" +}}
{{           "      ) LIKE historical_raw_source_template;");}}{{     final TableResult output = tableEnv.from("historical_raw_source").select($("line")).execute();}}
{{     output.print();}}
{{  }}}
{{}}}
{{```}}

and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' directory:

{{```text}}
{{one}}
{{two}}
{{three}}
{{four}}
{{five}}
{{six}}
{{seven}}
{{eight}}
{{nine}}
{{ten}}
{{```}}

{{The print results are:}}
{{```text}}
{{+----+--------------------------------+}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{+----+--------------------------------+}}
{{10 rows in set}}
{{```}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)