You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Leonard Xu <xb...@gmail.com> on 2020/06/05 04:49:28 UTC

Re: Flink sql nested elements

Hi,Ramana

For nested data type, Flink use dot (eg a.b.c) to visit nested elements. Your SQL syntax looks right, which Flink version are you using? And could you post your Avro Schema file and DDL ?

Best,
Leonard Xu

> 在 2020年6月5日,03:34,Ramana Uppala <ra...@capitalone.com> 写道:
> 
> We have Avro schema that contains nested structure and when querying using Flink SQL, we are getting below error.
> 
> Exception in thread "main" java.lang.AssertionError
> 	at org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236)
> 	at org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226)
> 	at org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232)
> 	at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733)
> 	at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718)
> 	at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> 
> Example Schema:
> ROW<`col1` VARCHAR(2147483647), `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2` VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
> 
> Example SQL:
> insert into CSVSink
> select
> col1,
> postalAddress.addressLine1 as address
> from myStream
> 
> In Flink SQL, How to select nested elements ?
> 


Re: [External Sender] Flink sql nested elements

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Ramna

Happy to hear you’ve resolved your problem, if you could post your SQL maybe this question can get quicker response.

Flink SQL is case sensitive default and there had an issue to track[1], I think it makes sense to add some specification in SQL section of docs.

Best,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-16175 <https://issues.apache.org/jira/browse/FLINK-16175>


> 在 2020年6月9日,20:09,Ramana Uppala <ra...@capitalone.com> 写道:
> 
> It will be great if we can convert all query elements to lower case similar to Hive.


Re: [External Sender] Re: Flink sql nested elements

Posted by Ramana Uppala <ra...@capitalone.com>.
Hi Dawid,

This issue has been resolved.

From our debugging we found out that Calcite parser was able to resolve the nested elements as expected. But, expecting case to match with the schema. Our SQL select field case and schema field case was not matching in this scenario. After fixing sql to have the correct case, query worked as expected.

Is Flink SQL case is case sensitive ? We don't see any documentation related to this.

It will be great if we can convert all query elements to lower case similar to Hive.

On 2020/06/09 07:58:20, Dawid Wysakowicz <dw...@apache.org> wrote: 
> Hi Ramana,
> 
> Could you help us with a way to reproduce the behaviour? I could not
> reproduce it locally. The code below works for me just fine:
> 
> |StreamExecutionEnvironment exec =
> StreamExecutionEnvironment.getExecutionEnvironment();||
> ||StreamTableEnvironment tEnv = StreamTableEnvironment.create(||
> ||        exec,||
> ||       
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());||
> ||tEnv.registerTableSource(||
> ||        "T",||
> ||        new StreamTableSource<Row>() {||
> ||            @Override||
> ||            public TableSchema getTableSchema() {||
> ||                return TableSchema.builder()||
> ||                        .field("f3",
> DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING())))||
> ||                        .build();||
> ||            }||
> ||            @Override||
> ||            public DataStream<Row>
> getDataStream(StreamExecutionEnvironment execEnv) {||
> ||                return execEnv.fromCollection(||
> ||                        Arrays.asList(Row.of(Row.of("ABCDE")))||
> ||                );||
> ||            }||
> ||            @Override||
> ||            public DataType getProducedDataType() {||
> ||                return DataTypes.ROW(||
> ||                        DataTypes.FIELD(||
> ||                                "f3",||
> ||                               
> DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING()))||
> ||                        )||
> ||                );||
> ||            }||
> ||        });||
> ||Table table = tEnv.sqlQuery("SELECT f3.nested FROM T");||
> ||DataStream<Row> result = tEnv.toAppendStream(||
> ||        table,||
> ||        Types.ROW(Types.STRING()));||
> ||result.print();||
> ||exec.execute();|
> 
> Best,
> 
> Dawid
> 
> On 05/06/2020 13:59, Ramana Uppala wrote:
> > Hi Leonard,
> >
> > We are using Flink 1.10 version and I can not share the complete
> > schema but it looks like below in Hive Catalog, 
> >
> > flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647),
> > `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
> > VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
> >
> > Based on the stack trace, sqlUpdate API validates the sql statement
> > and throwing the above error.  Do we need to configure any Calcite
> > configuration to support nested types ?
> >
> > Thanks,
> > Ramana.
> >
> > On Fri, Jun 5, 2020 at 12:49 AM Leonard Xu <xbjtdcq@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Hi,Ramana
> >
> >     For nested data type, Flink use dot (eg a.b.c) to visit nested
> >     elements. Your SQL syntax looks right, which Flink version are you
> >     using? And could you post your Avro Schema file and DDL ?
> >
> >     Best,
> >     Leonard Xu
> >
> >     > 在 2020年6月5日,03:34,Ramana Uppala <ramana.uppala@capitalone.com
> >     <ma...@capitalone.com>> 写道:
> >     >
> >     > We have Avro schema that contains nested structure and when
> >     querying using Flink SQL, we are getting below error.
> >     >
> >     > Exception in thread "main" java.lang.AssertionError
> >     >       at
> >     org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236)
> >     >       at
> >     org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226)
> >     >       at
> >     org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232)
> >     >       at
> >     org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416)
> >     >       at
> >     org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733)
> >     >       at
> >     org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718)
> >     >       at
> >     org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> >     >
> >     > Example Schema:
> >     > ROW<`col1` VARCHAR(2147483647), `postalAddress`
> >     ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
> >     VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
> >     >
> >     > Example SQL:
> >     > insert into CSVSink
> >     > select
> >     > col1,
> >     > postalAddress.addressLine1 as address
> >     > from myStream
> >     >
> >     > In Flink SQL, How to select nested elements ?
> >     >
> >
> > ------------------------------------------------------------------------
> >
> >
> > The information contained in this e-mail is confidential and/or
> > proprietary to Capital One and/or its affiliates and may only be used
> > solely in performance of work or services for Capital One. The
> > information transmitted herewith is intended only for use by the
> > individual or entity to which it is addressed. If the reader of this
> > message is not the intended recipient, you are hereby notified that
> > any review, retransmission, dissemination, distribution, copying or
> > other use of, or taking of any action in reliance upon this
> > information is strictly prohibited. If you have received this
> > communication in error, please contact the sender and delete the
> > material from your computer.
> >
> >
> 

Re: [External Sender] Re: Flink sql nested elements

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Ramana,

Could you help us with a way to reproduce the behaviour? I could not
reproduce it locally. The code below works for me just fine:

|StreamExecutionEnvironment exec =
StreamExecutionEnvironment.getExecutionEnvironment();||
||StreamTableEnvironment tEnv = StreamTableEnvironment.create(||
||        exec,||
||       
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());||
||tEnv.registerTableSource(||
||        "T",||
||        new StreamTableSource<Row>() {||
||            @Override||
||            public TableSchema getTableSchema() {||
||                return TableSchema.builder()||
||                        .field("f3",
DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING())))||
||                        .build();||
||            }||
||            @Override||
||            public DataStream<Row>
getDataStream(StreamExecutionEnvironment execEnv) {||
||                return execEnv.fromCollection(||
||                        Arrays.asList(Row.of(Row.of("ABCDE")))||
||                );||
||            }||
||            @Override||
||            public DataType getProducedDataType() {||
||                return DataTypes.ROW(||
||                        DataTypes.FIELD(||
||                                "f3",||
||                               
DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING()))||
||                        )||
||                );||
||            }||
||        });||
||Table table = tEnv.sqlQuery("SELECT f3.nested FROM T");||
||DataStream<Row> result = tEnv.toAppendStream(||
||        table,||
||        Types.ROW(Types.STRING()));||
||result.print();||
||exec.execute();|

Best,

Dawid

On 05/06/2020 13:59, Ramana Uppala wrote:
> Hi Leonard,
>
> We are using Flink 1.10 version and I can not share the complete
> schema but it looks like below in Hive Catalog, 
>
> flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647),
> `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
> VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
>
> Based on the stack trace, sqlUpdate API validates the sql statement
> and throwing the above error.  Do we need to configure any Calcite
> configuration to support nested types ?
>
> Thanks,
> Ramana.
>
> On Fri, Jun 5, 2020 at 12:49 AM Leonard Xu <xbjtdcq@gmail.com
> <ma...@gmail.com>> wrote:
>
>     Hi,Ramana
>
>     For nested data type, Flink use dot (eg a.b.c) to visit nested
>     elements. Your SQL syntax looks right, which Flink version are you
>     using? And could you post your Avro Schema file and DDL ?
>
>     Best,
>     Leonard Xu
>
>     > 在 2020年6月5日,03:34,Ramana Uppala <ramana.uppala@capitalone.com
>     <ma...@capitalone.com>> 写道:
>     >
>     > We have Avro schema that contains nested structure and when
>     querying using Flink SQL, we are getting below error.
>     >
>     > Exception in thread "main" java.lang.AssertionError
>     >       at
>     org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236)
>     >       at
>     org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226)
>     >       at
>     org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232)
>     >       at
>     org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416)
>     >       at
>     org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733)
>     >       at
>     org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718)
>     >       at
>     org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>     >
>     > Example Schema:
>     > ROW<`col1` VARCHAR(2147483647), `postalAddress`
>     ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
>     VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
>     >
>     > Example SQL:
>     > insert into CSVSink
>     > select
>     > col1,
>     > postalAddress.addressLine1 as address
>     > from myStream
>     >
>     > In Flink SQL, How to select nested elements ?
>     >
>
> ------------------------------------------------------------------------
>
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The
> information transmitted herewith is intended only for use by the
> individual or entity to which it is addressed. If the reader of this
> message is not the intended recipient, you are hereby notified that
> any review, retransmission, dissemination, distribution, copying or
> other use of, or taking of any action in reliance upon this
> information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the
> material from your computer.
>
>

Re: [External Sender] Re: Flink sql nested elements

Posted by Ramana Uppala <ra...@capitalone.com>.
Hi Leonard,

We are using Flink 1.10 version and I can not share the complete schema but
it looks like below in Hive Catalog,

flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647),
`postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>

Based on the stack trace, sqlUpdate API validates the sql statement and
throwing the above error.  Do we need to configure any Calcite
configuration to support nested types ?

Thanks,
Ramana.

On Fri, Jun 5, 2020 at 12:49 AM Leonard Xu <xb...@gmail.com> wrote:

> Hi,Ramana
>
> For nested data type, Flink use dot (eg a.b.c) to visit nested elements.
> Your SQL syntax looks right, which Flink version are you using? And could
> you post your Avro Schema file and DDL ?
>
> Best,
> Leonard Xu
>
> > 在 2020年6月5日,03:34,Ramana Uppala <ra...@capitalone.com> 写道:
> >
> > We have Avro schema that contains nested structure and when querying
> using Flink SQL, we are getting below error.
> >
> > Exception in thread "main" java.lang.AssertionError
> >       at
> org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236)
> >       at
> org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226)
> >       at
> org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232)
> >       at
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416)
> >       at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733)
> >       at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718)
> >       at
> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> >
> > Example Schema:
> > ROW<`col1` VARCHAR(2147483647), `postalAddress` ROW<`addressLine1`
> VARCHAR(2147483647), `addressLine2` VARCHAR(2147483647), `addressLine3`
> VARCHAR(2147483647)>>
> >
> > Example SQL:
> > insert into CSVSink
> > select
> > col1,
> > postalAddress.addressLine1 as address
> > from myStream
> >
> > In Flink SQL, How to select nested elements ?
> >
>
>

______________________________________________________________________



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.