You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2020/07/21 08:54:53 UTC
How to use a nested column for CREATE TABLE PARTITIONED BY
Hi,
I want to create subdirectories named after values of a nested column,
location.transId.
This is my first attempt:
> CREATE TABLE output
> PARTITIONED BY (`location.transId`)
> WITH (
> 'connector' = 'filesystem',
> 'path' = 'east-out',
> 'format' = 'json'
> ) LIKE navi (EXCLUDING ALL)
>
It fails with the following errors:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Partition column 'location.transId' not defined in the table schema.
> Available columns: ['type', 'location']
> at
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
> at
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
> at
> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>
As It seems like nested columns are not recognized as a eligible column for
PARTITIONED BY, I tried the following:
> CREATE TABLE output (
> `partition` AS location.transId
> ) PARTITIONED BY (`partition`)
> WITH (
> 'connector' = 'filesystem',
> 'path' = 'east-out',
> 'format' = 'json'
> ) LIKE navi (EXCLUDING ALL)
>
It also fails:
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: The field count of logical
> schema of the table does not match with the field count of physical schema
. The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
STRING>]
The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
STRING>,STRING].
Thanks in advance,
Dongwon
Re: How to use a nested column for CREATE TABLE PARTITIONED BY
Posted by Danny Chan <yu...@gmail.com>.
You can not do that in Flink yet, Flink partition column must be mapped to columns from the table schema which you can select from. The syntax is a little different from Hive’s =>
create table table_name (
id int,
dtDontQuery string,
name string
)
partitioned by (date string)
In which you can declare the partition column name & type at the same time.
Best,
Danny Chan
在 2020年7月21日 +0800 PM11:30,Dongwon Kim <ea...@gmail.com>,写道:
> Thanks Jark for the update.
>
> However, getting back to the original question, can I use a nested column directly for CREATE TABLE PARTITIONED BY like below without declaring an additional column?
>
> > CREATE TABLE output
> > PARTITIONED BY (`location.transId`)
> > WITH (
> > 'connector' = 'filesystem',
> > 'path' = 'east-out',
> > 'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
>
> I tried (`location`.transId) as well but it fails with an exception:
> > Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 3, column 27.
> > Was expecting one of:
> > ")" ...
> > "," ...
> >
> > at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> > at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> > at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
> > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "." at line 3, column 27.
> > Was expecting one of:
> > ")" ...
> > "," ...
> >
> > at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
> > at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
> > at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
> > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
> > at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
> > at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
> > ... 3 more
> > Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 3, column 27.
> > Was expecting one of:
> > ")" ...
> > "," ...
> >
> > at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
> > at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900)
> > at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398)
> > at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292)
> > at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269)
> > at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047)
> > at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308)
> > at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
> > at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
> > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
> > ... 5 more
>
> Best,
>
> Dongwon
>
> > On Wed, Jul 22, 2020 at 12:09 AM Jark Wu <im...@gmail.com> wrote:
> > > Hi Dongwon,
> > >
> > > I think this is a bug in the Filesystem connector which doesn't exclude the computed columns when building the TableSource.
> > > I created an issue [1] to track this problem.
> > >
> > > Best,
> > > Jark
> > >
> > > [1]: https://issues.apache.org/jira/browse/FLINK-18665
> > >
> > > > On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <ea...@gmail.com> wrote:
> > > > > Hi Danny,
> > > > >
> > > > > > Which version did you use
> > > > > I use Flink 1.11.0.
> > > > >
> > > > > > what SQL context throws the error ?
> > > > > I think the declaration itself is not a problem.
> > > > > The exception occurs when I tried to execute the following which I didn't show you in the previous email:
> > > > > > tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Dongwon
> > > > >
> > > > > > On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <yu...@gmail.com> wrote:
> > > > > > > Hi, I execute the sql below
> > > > > > >
> > > > > > > """
> > > > > > > |create table navi (
> > > > > > > | a STRING,
> > > > > > > | location ROW<lastUpdateTime BIGINT, transId STRING>
> > > > > > > |) with (
> > > > > > > | 'connector' = 'filesystem',
> > > > > > > | 'path' = 'east-out',
> > > > > > > | 'format' = 'json'
> > > > > > > |)
> > > > > > > |""".stripMargin
> > > > > > > tableEnv.executeSql(sql0)
> > > > > > > val sql =
> > > > > > > """
> > > > > > > |CREATE TABLE output (
> > > > > > > | `partition` AS location.transId
> > > > > > > |) PARTITIONED BY (`partition`)
> > > > > > > |WITH (
> > > > > > > | 'connector' = 'filesystem',
> > > > > > > | 'path' = 'east-out',
> > > > > > > | 'format' = 'json'
> > > > > > > |) LIKE navi (EXCLUDING ALL)
> > > > > > > |""".stripMargin
> > > > > > > tableEnv.executeSql(sql)
> > > > > > >
> > > > > > > In master branch, both are correct, can you share you stack trace detail ? Which version did you use and what SQL context throws the error ?
> > > > > > >
> > > > > > > Best,
> > > > > > > Danny Chan
> > > > > > > 在 2020年7月21日 +0800 PM4:55,Dongwon Kim <ea...@gmail.com>,写道:
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I want to create subdirectories named after values of a nested column, location.transId.
> > > > > > > >
> > > > > > > > This is my first attempt:
> > > > > > > > > CREATE TABLE output
> > > > > > > > > PARTITIONED BY (`location.transId`)
> > > > > > > > > WITH (
> > > > > > > > > 'connector' = 'filesystem',
> > > > > > > > > 'path' = 'east-out',
> > > > > > > > > 'format' = 'json'
> > > > > > > > > ) LIKE navi (EXCLUDING ALL)
> > > > > > > >
> > > > > > > > It fails with the following errors:
> > > > > > > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: Partition column 'location.transId' not defined in the table schema. Available columns: ['type', 'location']
> > > > > > > > > at org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
> > > > > > > > > at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
> > > > > > > > > at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
> > > > > > > > > at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
> > > > > > > > > at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> > > > > > > > > at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> > > > > > > > > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
> > > > > > > >
> > > > > > > > As It seems like nested columns are not recognized as a eligible column for PARTITIONED BY, I tried the following:
> > > > > > > > > CREATE TABLE output (
> > > > > > > > > `partition` AS location.transId
> > > > > > > > > ) PARTITIONED BY (`partition`)
> > > > > > > > > WITH (
> > > > > > > > > 'connector' = 'filesystem',
> > > > > > > > > 'path' = 'east-out',
> > > > > > > > > 'format' = 'json'
> > > > > > > > > ) LIKE navi (EXCLUDING ALL)
> > > > > > > > It also fails:
> > > > > > > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: The field count of logical schema of the table does not match with the field count of physical schema
> > > > > > > > . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
> > > > > > > > The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>,STRING].
> > > > > > > >
> > > > > > > > Thanks in advance,
> > > > > > > >
> > > > > > > > Dongwon
Re: How to use a nested column for CREATE TABLE PARTITIONED BY
Posted by Dongwon Kim <ea...@gmail.com>.
Thanks Jark for the update.
However, getting back to the original question, can I use a nested column
directly for CREATE TABLE PARTITIONED BY like below without declaring an
additional column?
CREATE TABLE output
> PARTITIONED BY (`location.transId`)
> WITH (
> 'connector' = 'filesystem',
> 'path' = 'east-out',
> 'format' = 'json'
> ) LIKE navi (EXCLUDING ALL)
>
I tried (`location`.transId) as well but it fails with an exception:
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "." at line 3, column 27.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "." at line 3, column 27.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
> at
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
> at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
> ... 3 more
> Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered
> "." at line 3, column 27.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
> ... 5 more
Best,
Dongwon
On Wed, Jul 22, 2020 at 12:09 AM Jark Wu <im...@gmail.com> wrote:
> Hi Dongwon,
>
> I think this is a bug in the Filesystem connector which doesn't exclude
> the computed columns when building the TableSource.
> I created an issue [1] to track this problem.
>
> Best,
> Jark
>
> [1]: https://issues.apache.org/jira/browse/FLINK-18665
>
> On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi Danny,
>>
>> Which version did you use
>>
>> I use Flink 1.11.0.
>>
>>
>>> what SQL context throws the error ?
>>
>> I think the declaration itself is not a problem.
>> The exception occurs when I tried to execute the following which I didn't
>> show you in the previous email:
>>
>>> tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
>>
>>
>> Thanks,
>>
>> Dongwon
>>
>> On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <yu...@gmail.com> wrote:
>>
>>> Hi, I execute the sql below
>>>
>>> """
>>> |create table navi (
>>> | a STRING,
>>> | location ROW<lastUpdateTime BIGINT, transId STRING>
>>> |) with (
>>> | 'connector' = 'filesystem',
>>> | 'path' = 'east-out',
>>> | 'format' = 'json'
>>> |)
>>> |""".stripMargin
>>> tableEnv.executeSql(sql0)
>>> val sql =
>>> """
>>> |CREATE TABLE output (
>>> | `partition` AS location.transId
>>> |) PARTITIONED BY (`partition`)
>>> |WITH (
>>> | 'connector' = 'filesystem',
>>> | 'path' = 'east-out',
>>> | 'format' = 'json'
>>> |) LIKE navi (EXCLUDING ALL)
>>> |""".stripMargin
>>> tableEnv.executeSql(sql)
>>>
>>>
>>> In master branch, both are correct, can you share you stack trace detail
>>> ? Which version did you use and what SQL context throws the error ?
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年7月21日 +0800 PM4:55,Dongwon Kim <ea...@gmail.com>,写道:
>>>
>>> Hi,
>>>
>>> I want to create subdirectories named after values of a nested column,
>>> location.transId.
>>>
>>> This is my first attempt:
>>>
>>>> CREATE TABLE output
>>>> PARTITIONED BY (`location.transId`)
>>>> WITH (
>>>> 'connector' = 'filesystem',
>>>> 'path' = 'east-out',
>>>> 'format' = 'json'
>>>> ) LIKE navi (EXCLUDING ALL)
>>>>
>>>
>>> It fails with the following errors:
>>>
>>>> Exception in thread "main"
>>>> org.apache.flink.table.api.ValidationException: Partition column
>>>> 'location.transId' not defined in the table schema. Available columns:
>>>> ['type', 'location']
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>>>>
>>>
>>> As It seems like nested columns are not recognized as a eligible column
>>> for PARTITIONED BY, I tried the following:
>>>
>>>> CREATE TABLE output (
>>>> `partition` AS location.transId
>>>> ) PARTITIONED BY (`partition`)
>>>> WITH (
>>>> 'connector' = 'filesystem',
>>>> 'path' = 'east-out',
>>>> 'format' = 'json'
>>>> ) LIKE navi (EXCLUDING ALL)
>>>>
>>> It also fails:
>>>
>>>> Exception in thread "main"
>>>> org.apache.flink.table.api.ValidationException: The field count of logical
>>>> schema of the table does not match with the field count of physical schema
>>>
>>> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
>>> STRING>]
>>> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
>>> STRING>,STRING].
>>>
>>> Thanks in advance,
>>>
>>> Dongwon
>>>
>>>
Re: How to use a nested column for CREATE TABLE PARTITIONED BY
Posted by Jark Wu <im...@gmail.com>.
Hi Dongwon,
I think this is a bug in the Filesystem connector which doesn't exclude the
computed columns when building the TableSource.
I created an issue [1] to track this problem.
Best,
Jark
[1]: https://issues.apache.org/jira/browse/FLINK-18665
On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <ea...@gmail.com> wrote:
> Hi Danny,
>
> Which version did you use
>
> I use Flink 1.11.0.
>
>
>> what SQL context throws the error ?
>
> I think the declaration itself is not a problem.
> The exception occurs when I tried to execute the following which I didn't
> show you in the previous email:
>
>> tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
>
>
> Thanks,
>
> Dongwon
>
> On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <yu...@gmail.com> wrote:
>
>> Hi, I execute the sql below
>>
>> """
>> |create table navi (
>> | a STRING,
>> | location ROW<lastUpdateTime BIGINT, transId STRING>
>> |) with (
>> | 'connector' = 'filesystem',
>> | 'path' = 'east-out',
>> | 'format' = 'json'
>> |)
>> |""".stripMargin
>> tableEnv.executeSql(sql0)
>> val sql =
>> """
>> |CREATE TABLE output (
>> | `partition` AS location.transId
>> |) PARTITIONED BY (`partition`)
>> |WITH (
>> | 'connector' = 'filesystem',
>> | 'path' = 'east-out',
>> | 'format' = 'json'
>> |) LIKE navi (EXCLUDING ALL)
>> |""".stripMargin
>> tableEnv.executeSql(sql)
>>
>>
>> In master branch, both are correct, can you share you stack trace detail
>> ? Which version did you use and what SQL context throws the error ?
>>
>> Best,
>> Danny Chan
>> 在 2020年7月21日 +0800 PM4:55,Dongwon Kim <ea...@gmail.com>,写道:
>>
>> Hi,
>>
>> I want to create subdirectories named after values of a nested column,
>> location.transId.
>>
>> This is my first attempt:
>>
>>> CREATE TABLE output
>>> PARTITIONED BY (`location.transId`)
>>> WITH (
>>> 'connector' = 'filesystem',
>>> 'path' = 'east-out',
>>> 'format' = 'json'
>>> ) LIKE navi (EXCLUDING ALL)
>>>
>>
>> It fails with the following errors:
>>
>>> Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException: Partition column
>>> 'location.transId' not defined in the table schema. Available columns:
>>> ['type', 'location']
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
>>> at
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
>>> at
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
>>> at
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>>>
>>
>> As It seems like nested columns are not recognized as a eligible column
>> for PARTITIONED BY, I tried the following:
>>
>>> CREATE TABLE output (
>>> `partition` AS location.transId
>>> ) PARTITIONED BY (`partition`)
>>> WITH (
>>> 'connector' = 'filesystem',
>>> 'path' = 'east-out',
>>> 'format' = 'json'
>>> ) LIKE navi (EXCLUDING ALL)
>>>
>> It also fails:
>>
>>> Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException: The field count of logical
>>> schema of the table does not match with the field count of physical schema
>>
>> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
>> STRING>]
>> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
>> STRING>,STRING].
>>
>> Thanks in advance,
>>
>> Dongwon
>>
>>
Re: How to use a nested column for CREATE TABLE PARTITIONED BY
Posted by Dongwon Kim <ea...@gmail.com>.
Hi Danny,
Which version did you use
I use Flink 1.11.0.
> what SQL context throws the error ?
I think the declaration itself is not a problem.
The exception occurs when I tried to execute the following which I didn't
show you in the previous email:
> tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
Thanks,
Dongwon
On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <yu...@gmail.com> wrote:
> Hi, I execute the sql below
>
> """
> |create table navi (
> | a STRING,
> | location ROW<lastUpdateTime BIGINT, transId STRING>
> |) with (
> | 'connector' = 'filesystem',
> | 'path' = 'east-out',
> | 'format' = 'json'
> |)
> |""".stripMargin
> tableEnv.executeSql(sql0)
> val sql =
> """
> |CREATE TABLE output (
> | `partition` AS location.transId
> |) PARTITIONED BY (`partition`)
> |WITH (
> | 'connector' = 'filesystem',
> | 'path' = 'east-out',
> | 'format' = 'json'
> |) LIKE navi (EXCLUDING ALL)
> |""".stripMargin
> tableEnv.executeSql(sql)
>
>
> In master branch, both are correct, can you share you stack trace detail ?
> Which version did you use and what SQL context throws the error ?
>
> Best,
> Danny Chan
> 在 2020年7月21日 +0800 PM4:55,Dongwon Kim <ea...@gmail.com>,写道:
>
> Hi,
>
> I want to create subdirectories named after values of a nested column,
> location.transId.
>
> This is my first attempt:
>
>> CREATE TABLE output
>> PARTITIONED BY (`location.transId`)
>> WITH (
>> 'connector' = 'filesystem',
>> 'path' = 'east-out',
>> 'format' = 'json'
>> ) LIKE navi (EXCLUDING ALL)
>>
>
> It fails with the following errors:
>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Partition column
>> 'location.transId' not defined in the table schema. Available columns:
>> ['type', 'location']
>> at
>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
>> at
>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
>> at
>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>>
>
> As It seems like nested columns are not recognized as a eligible column
> for PARTITIONED BY, I tried the following:
>
>> CREATE TABLE output (
>> `partition` AS location.transId
>> ) PARTITIONED BY (`partition`)
>> WITH (
>> 'connector' = 'filesystem',
>> 'path' = 'east-out',
>> 'format' = 'json'
>> ) LIKE navi (EXCLUDING ALL)
>>
> It also fails:
>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: The field count of logical
>> schema of the table does not match with the field count of physical schema
>
> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
> STRING>]
> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
> STRING>,STRING].
>
> Thanks in advance,
>
> Dongwon
>
>
Re: How to use a nested column for CREATE TABLE PARTITIONED BY
Posted by Danny Chan <yu...@gmail.com>.
Hi, I execute the sql below
"""
|create table navi (
| a STRING,
| location ROW<lastUpdateTime BIGINT, transId STRING>
|) with (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|)
|""".stripMargin
tableEnv.executeSql(sql0)
val sql =
"""
|CREATE TABLE output (
| `partition` AS location.transId
|) PARTITIONED BY (`partition`)
|WITH (
| 'connector' = 'filesystem',
| 'path' = 'east-out',
| 'format' = 'json'
|) LIKE navi (EXCLUDING ALL)
|""".stripMargin
tableEnv.executeSql(sql)
In master branch, both are correct, can you share you stack trace detail ? Which version did you use and what SQL context throws the error ?
Best,
Danny Chan
在 2020年7月21日 +0800 PM4:55,Dongwon Kim <ea...@gmail.com>,写道:
> Hi,
>
> I want to create subdirectories named after values of a nested column, location.transId.
>
> This is my first attempt:
> > CREATE TABLE output
> > PARTITIONED BY (`location.transId`)
> > WITH (
> > 'connector' = 'filesystem',
> > 'path' = 'east-out',
> > 'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
>
> It fails with the following errors:
> > Exception in thread "main" org.apache.flink.table.api.ValidationException: Partition column 'location.transId' not defined in the table schema. Available columns: ['type', 'location']
> > at org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
> > at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
> > at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
> > at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
> > at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> > at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>
> As It seems like nested columns are not recognized as a eligible column for PARTITIONED BY, I tried the following:
> > CREATE TABLE output (
> > `partition` AS location.transId
> > ) PARTITIONED BY (`partition`)
> > WITH (
> > 'connector' = 'filesystem',
> > 'path' = 'east-out',
> > 'format' = 'json'
> > ) LIKE navi (EXCLUDING ALL)
> It also fails:
> > Exception in thread "main" org.apache.flink.table.api.ValidationException: The field count of logical schema of the table does not match with the field count of physical schema
> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>]
> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` STRING>,STRING].
>
> Thanks in advance,
>
> Dongwon