You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "Sun.Zhu" <17...@163.com> on 2020/06/16 16:48:56 UTC

回复:sqlclient集成hiveCatalog查询kafka表问题

是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?




在2020年06月16日 18:38,Benchao Li 写道:
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17...@163.com> 于2020年6月16日周二 下午6:11写道:

> 我编译了1.11包
> 在sql-cli下查询hive的表报如下错误:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
>
>
> 查注册的kafka表报:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException: org.apache.flink.table.dataformat.BaseRow
>
>
> 依赖包是从1.10.1下面拷贝的
> | |
> Sun.Zhu
> |
> |
> 17626017841@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 11:44,Sun.Zhu<17...@163.com> 写道:
> Got it!
> Thx,junbao
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017841@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 09:32,zhangjunbao<wi...@outlook.com> 写道:
> 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> https://issues.apache.org/jira/browse/FLINK-17189 <
> https://issues.apache.org/jira/browse/FLINK-17189>
>
> Best,
> Junbao Zhang
>
> 2020年6月13日 上午12:31,Sun.Zhu <17...@163.com> 写道:
>
> hi,all
> 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> ddl如下:
> |
> CREATETABLE user_behavior (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3),
> proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> ) WITH (
> 'connector.type' = 'kafka',  -- 使用 kafka connector
> 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'user_behavior',  -- kafka topic
> 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> 'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper
> 地址
> 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> broker 地址
> 'format.type' = 'json'-- 数据源格式为 json
> );
> |
> 在查询时select * from user_behavior;报错如下:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> validated type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> converted type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT NULL
> rel:
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[$5])
> LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> SECOND)])
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
>
>
> flink版本:1.10.1
> blink planner,streaming model
>
>
> Thx
> | |
> Sun.Zhu
> |
> |
> 17626017841@163.com
> |
> 签名由网易邮箱大师定制
>
>
>

回复: sqlclient集成hiveCatalog查询kafka表问题

Posted by "Sun.Zhu" <17...@163.com>.
非常感谢,我去试试


| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 18:13,Rui Li<li...@gmail.com> 写道:
需要启动一个独立的metastore server,然后hive.metastore.uris配置的是你metastore server的地址。
最简单的场景,在本地启动metastore server命令:hive --service metastore
hive.metastore.uris设置成:thrift://localhost:9083

更详细的metastore使用方法可以参考hive文档:
https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration

On Thu, Jun 18, 2020 at 5:21 PM Sun.Zhu <17...@163.com> wrote:

对应这种改动还是挺大的,有对应的说明文档吗?
hive.metastore.uris 这个需要怎么配置,有样例吗?


| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 17:01,Rui Li<li...@gmail.com> 写道:

是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。

On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu <xb...@gmail.com> wrote:


Hi

在 2020年6月18日,16:45,Sun.Zhu <17...@163.com> 写道:

Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
allowed. Make sure you have set a valid value for hive.metastore.uris

错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
metastore 并在conf文件配置 hive.metastore.uris

Best,
Leonard Xu



--
Best regards!
Rui Li



--
Best regards!
Rui Li

Re: sqlclient集成hiveCatalog查询kafka表问题

Posted by Rui Li <li...@gmail.com>.
需要启动一个独立的metastore server,然后hive.metastore.uris配置的是你metastore server的地址。
最简单的场景,在本地启动metastore server命令:hive --service metastore
hive.metastore.uris设置成:thrift://localhost:9083

更详细的metastore使用方法可以参考hive文档:
https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration

On Thu, Jun 18, 2020 at 5:21 PM Sun.Zhu <17...@163.com> wrote:

> 对应这种改动还是挺大的,有对应的说明文档吗?
> hive.metastore.uris 这个需要怎么配置,有样例吗?
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017841@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月18日 17:01,Rui Li<li...@gmail.com> 写道:
>
> 是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。
>
> On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu <xb...@gmail.com> wrote:
>
>
> Hi
>
> 在 2020年6月18日,16:45,Sun.Zhu <17...@163.com> 写道:
>
> Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
> allowed. Make sure you have set a valid value for hive.metastore.uris
>
> 错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
> metastore 并在conf文件配置 hive.metastore.uris
>
> Best,
> Leonard Xu
>
>
>
> --
> Best regards!
> Rui Li
>


-- 
Best regards!
Rui Li

回复: sqlclient集成hiveCatalog查询kafka表问题

Posted by "Sun.Zhu" <17...@163.com>.
对应这种改动还是挺大的,有对应的说明文档吗?
hive.metastore.uris 这个需要怎么配置,有样例吗?


| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 17:01,Rui Li<li...@gmail.com> 写道:
是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。

On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu <xb...@gmail.com> wrote:


Hi

在 2020年6月18日,16:45,Sun.Zhu <17...@163.com> 写道:

Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
allowed. Make sure you have set a valid value for hive.metastore.uris

错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
metastore 并在conf文件配置 hive.metastore.uris

Best,
Leonard Xu



--
Best regards!
Rui Li

Re: sqlclient集成hiveCatalog查询kafka表问题

Posted by Rui Li <li...@gmail.com>.
是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。

On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu <xb...@gmail.com> wrote:

>
> Hi
>
> > 在 2020年6月18日,16:45,Sun.Zhu <17...@163.com> 写道:
> >
> > Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
> allowed. Make sure you have set a valid value for hive.metastore.uris
>
> 错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
> metastore 并在conf文件配置 hive.metastore.uris
>
> Best,
> Leonard Xu



-- 
Best regards!
Rui Li

Re: sqlclient集成hiveCatalog查询kafka表问题

Posted by Leonard Xu <xb...@gmail.com>.
 
Hi

> 在 2020年6月18日,16:45,Sun.Zhu <17...@163.com> 写道:
> 
> Caused by: java.lang.IllegalArgumentException: Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris

错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive metastore 并在conf文件配置 hive.metastore.uris

Best,
Leonard Xu

回复: sqlclient集成hiveCatalog查询kafka表问题

Posted by "Sun.Zhu" <17...@163.com>.
Hi,Rui Li
我把connector的包也替换成1.11的了,结果sql-cli启动报错
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:818)
at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:230)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: java.lang.IllegalArgumentException: Embedded metastore is not allowed. Make sure you have set a valid value for hive.metastore.uris
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
at org.apache.flink.table.catalog.hive.HiveCatalog.<init>(HiveCatalog.java:171)
at org.apache.flink.table.catalog.hive.HiveCatalog.<init>(HiveCatalog.java:157)
at org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:84)
at org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:366)
at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$6(ExecutionContext.java:565)
at java.util.HashMap.forEach(HashMap.java:1289)
at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:564)
at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:252)
at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:563)
at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:512)
at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:171)
at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:124)
at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:807)


hive catalog的配置和1.10.1一样,如下:
catalogs: #[] # empty list
# A typical catalog definition looks like:
  - name: myhive
    type: hive
    hive-conf-dir: /Users/zhushang/Desktop/software/apache-hive-2.2.0-bin/conf
    hive-version: 2.2.0
| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 15:46,Rui Li<li...@gmail.com> 写道:
第三方包指的是flink-connector-hive这种吗?这些包在build的时候也会打出来的,只不过没有加到flink-dist里。到对应的module里找一下,比如flink-connector-hive会在<flink_source>/flink-connectors/flink-connector-hive/target下面。

On Thu, Jun 18, 2020 at 12:22 PM Jark Wu <im...@gmail.com> wrote:

你可以拿 release-1.11 分支: https://github.com/apache/flink/tree/release-1.11/
自己编译一下:mvn clean install -DskipTests
在 build-target 下就是打出来的 1.11 的分发包内容。

Best,
Jark



On Wed, 17 Jun 2020 at 15:30, Sun.Zhu <17...@163.com> wrote:



是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢


| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制


在2020年06月17日 13:25,Rui Li<li...@gmail.com> 写道:
是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。

On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17...@163.com> wrote:

Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息




在2020年06月17日 10:27,Benchao Li 写道:
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17...@163.com> 于2020年6月17日周三 上午12:49写道:

是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?




在2020年06月16日 18:38,Benchao Li 写道:
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17...@163.com> 于2020年6月16日周二 下午6:11写道:

我编译了1.11包
在sql-cli下查询hive的表报如下错误:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError:
org/apache/flink/table/dataformat/BaseRow


查注册的kafka表报:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException:
org.apache.flink.table.dataformat.BaseRow


依赖包是从1.10.1下面拷贝的
| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 11:44,Sun.Zhu<17...@163.com> 写道:
Got it!
Thx,junbao


| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao<wi...@outlook.com> 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 <
https://issues.apache.org/jira/browse/FLINK-17189>

Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka connector
'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181',  --
zookeeper
地址
'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to
preserve datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
behavior=[$3], ts=[$4], proctime=[$5])
LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
behavior=[$3], ts=[$4], proctime=[PROCTIME()])
LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])


flink版本:1.10.1
blink planner,streaming model


Thx
| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制







--
Best regards!
Rui Li




--
Best regards!
Rui Li

Re: sqlclient集成hiveCatalog查询kafka表问题

Posted by Rui Li <li...@gmail.com>.
第三方包指的是flink-connector-hive这种吗?这些包在build的时候也会打出来的,只不过没有加到flink-dist里。到对应的module里找一下,比如flink-connector-hive会在<flink_source>/flink-connectors/flink-connector-hive/target下面。

On Thu, Jun 18, 2020 at 12:22 PM Jark Wu <im...@gmail.com> wrote:

> 你可以拿 release-1.11 分支: https://github.com/apache/flink/tree/release-1.11/
> 自己编译一下:mvn clean install -DskipTests
> 在 build-target 下就是打出来的 1.11 的分发包内容。
>
> Best,
> Jark
>
>
>
> On Wed, 17 Jun 2020 at 15:30, Sun.Zhu <17...@163.com> wrote:
>
> >
> >
> > 是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017841@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月17日 13:25,Rui Li<li...@gmail.com> 写道:
> > 是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。
> >
> > On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17...@163.com> wrote:
> >
> > Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息
> >
> >
> >
> >
> > 在2020年06月17日 10:27,Benchao Li 写道:
> > 目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
> > module了。
> > 如果只是connector、format这些用老的版本,应该是没有问题的。
> > 你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink
> >
> > Sun.Zhu <17...@163.com> 于2020年6月17日周三 上午12:49写道:
> >
> > 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> > 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
> >
> >
> >
> >
> > 在2020年06月16日 18:38,Benchao Li 写道:
> > 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> > 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
> >
> > Sun.Zhu <17...@163.com> 于2020年6月16日周二 下午6:11写道:
> >
> > 我编译了1.11包
> > 在sql-cli下查询hive的表报如下错误:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.NoClassDefFoundError:
> > org/apache/flink/table/dataformat/BaseRow
> >
> >
> > 查注册的kafka表报:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.ClassNotFoundException:
> > org.apache.flink.table.dataformat.BaseRow
> >
> >
> > 依赖包是从1.10.1下面拷贝的
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017841@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 11:44,Sun.Zhu<17...@163.com> 写道:
> > Got it!
> > Thx,junbao
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017841@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 09:32,zhangjunbao<wi...@outlook.com> 写道:
> > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> > https://issues.apache.org/jira/browse/FLINK-17189 <
> > https://issues.apache.org/jira/browse/FLINK-17189>
> >
> > Best,
> > Junbao Zhang
> >
> > 2020年6月13日 上午12:31,Sun.Zhu <17...@163.com> 写道:
> >
> > hi,all
> > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> > ddl如下:
> > |
> > CREATETABLE user_behavior (
> > user_id BIGINT,
> > item_id BIGINT,
> > category_id BIGINT,
> > behavior STRING,
> > ts TIMESTAMP(3),
> > proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> > ) WITH (
> > 'connector.type' = 'kafka',  -- 使用 kafka connector
> > 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> > 'connector.topic' = 'user_behavior',  -- kafka topic
> > 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> > 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> > zookeeper
> > 地址
> > 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> > broker 地址
> > 'format.type' = 'json'-- 数据源格式为 json
> > );
> > |
> > 在查询时select * from user_behavior;报错如下:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.AssertionError: Conversion to relational algebra failed to
> > preserve datatypes:
> > validated type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> > converted type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> > NULL
> > rel:
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[$5])
> > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> > SECOND)])
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> > LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> >
> >
> > flink版本:1.10.1
> > blink planner,streaming model
> >
> >
> > Thx
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017841@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> >
> >
> >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li

Re: sqlclient集成hiveCatalog查询kafka表问题

Posted by Jark Wu <im...@gmail.com>.
你可以拿 release-1.11 分支: https://github.com/apache/flink/tree/release-1.11/
自己编译一下:mvn clean install -DskipTests
在 build-target 下就是打出来的 1.11 的分发包内容。

Best,
Jark



On Wed, 17 Jun 2020 at 15:30, Sun.Zhu <17...@163.com> wrote:

>
>
> 是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017841@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月17日 13:25,Rui Li<li...@gmail.com> 写道:
> 是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。
>
> On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17...@163.com> wrote:
>
> Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息
>
>
>
>
> 在2020年06月17日 10:27,Benchao Li 写道:
> 目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
> module了。
> 如果只是connector、format这些用老的版本,应该是没有问题的。
> 你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink
>
> Sun.Zhu <17...@163.com> 于2020年6月17日周三 上午12:49写道:
>
> 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
>
>
>
>
> 在2020年06月16日 18:38,Benchao Li 写道:
> 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
>
> Sun.Zhu <17...@163.com> 于2020年6月16日周二 下午6:11写道:
>
> 我编译了1.11包
> 在sql-cli下查询hive的表报如下错误:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.NoClassDefFoundError:
> org/apache/flink/table/dataformat/BaseRow
>
>
> 查注册的kafka表报:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassNotFoundException:
> org.apache.flink.table.dataformat.BaseRow
>
>
> 依赖包是从1.10.1下面拷贝的
> | |
> Sun.Zhu
> |
> |
> 17626017841@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 11:44,Sun.Zhu<17...@163.com> 写道:
> Got it!
> Thx,junbao
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017841@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月13日 09:32,zhangjunbao<wi...@outlook.com> 写道:
> 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> https://issues.apache.org/jira/browse/FLINK-17189 <
> https://issues.apache.org/jira/browse/FLINK-17189>
>
> Best,
> Junbao Zhang
>
> 2020年6月13日 上午12:31,Sun.Zhu <17...@163.com> 写道:
>
> hi,all
> 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> ddl如下:
> |
> CREATETABLE user_behavior (
> user_id BIGINT,
> item_id BIGINT,
> category_id BIGINT,
> behavior STRING,
> ts TIMESTAMP(3),
> proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> ) WITH (
> 'connector.type' = 'kafka',  -- 使用 kafka connector
> 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> 'connector.topic' = 'user_behavior',  -- kafka topic
> 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> zookeeper
> 地址
> 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> broker 地址
> 'format.type' = 'json'-- 数据源格式为 json
> );
> |
> 在查询时select * from user_behavior;报错如下:
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> validated type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> converted type:
> RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> NULL
> rel:
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[$5])
> LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> SECOND)])
> LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
>
>
> flink版本:1.10.1
> blink planner,streaming model
>
>
> Thx
> | |
> Sun.Zhu
> |
> |
> 17626017841@163.com
> |
> 签名由网易邮箱大师定制
>
>
>
>
>
>
>
> --
> Best regards!
> Rui Li
>

回复: sqlclient集成hiveCatalog查询kafka表问题

Posted by "Sun.Zhu" <17...@163.com>.

是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢


| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制


在2020年06月17日 13:25,Rui Li<li...@gmail.com> 写道:
是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。

On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17...@163.com> wrote:

Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息




在2020年06月17日 10:27,Benchao Li 写道:
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17...@163.com> 于2020年6月17日周三 上午12:49写道:

是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?




在2020年06月16日 18:38,Benchao Li 写道:
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17...@163.com> 于2020年6月16日周二 下午6:11写道:

我编译了1.11包
在sql-cli下查询hive的表报如下错误:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError:
org/apache/flink/table/dataformat/BaseRow


查注册的kafka表报:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException:
org.apache.flink.table.dataformat.BaseRow


依赖包是从1.10.1下面拷贝的
| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 11:44,Sun.Zhu<17...@163.com> 写道:
Got it!
Thx,junbao


| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao<wi...@outlook.com> 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 <
https://issues.apache.org/jira/browse/FLINK-17189>

Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka connector
'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = 'localhost:2181',  --
zookeeper
地址
'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
broker 地址
'format.type' = 'json'-- 数据源格式为 json
);
|
在查询时select * from user_behavior;报错如下:
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to
preserve datatypes:
validated type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
converted type:
RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
NULL
rel:
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
behavior=[$3], ts=[$4], proctime=[$5])
LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
SECOND)])
LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
behavior=[$3], ts=[$4], proctime=[PROCTIME()])
LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
[KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])


flink版本:1.10.1
blink planner,streaming model


Thx
| |
Sun.Zhu
|
|
17626017841@163.com
|
签名由网易邮箱大师定制







--
Best regards!
Rui Li

Re: sqlclient集成hiveCatalog查询kafka表问题

Posted by Rui Li <li...@gmail.com>.
是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。

On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17...@163.com> wrote:

> Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息
>
>
>
>
> 在2020年06月17日 10:27,Benchao Li 写道:
> 目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
> module了。
> 如果只是connector、format这些用老的版本,应该是没有问题的。
> 你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink
>
> Sun.Zhu <17...@163.com> 于2020年6月17日周三 上午12:49写道:
>
> > 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> > 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
> >
> >
> >
> >
> > 在2020年06月16日 18:38,Benchao Li 写道:
> > 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> > 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
> >
> > Sun.Zhu <17...@163.com> 于2020年6月16日周二 下午6:11写道:
> >
> > > 我编译了1.11包
> > > 在sql-cli下查询hive的表报如下错误:
> > > [ERROR] Could not execute SQL statement. Reason:
> > > java.lang.NoClassDefFoundError:
> org/apache/flink/table/dataformat/BaseRow
> > >
> > >
> > > 查注册的kafka表报:
> > > [ERROR] Could not execute SQL statement. Reason:
> > > java.lang.ClassNotFoundException:
> > org.apache.flink.table.dataformat.BaseRow
> > >
> > >
> > > 依赖包是从1.10.1下面拷贝的
> > > | |
> > > Sun.Zhu
> > > |
> > > |
> > > 17626017841@163.com
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > > 在2020年06月13日 11:44,Sun.Zhu<17...@163.com> 写道:
> > > Got it!
> > > Thx,junbao
> > >
> > >
> > > | |
> > > Sun.Zhu
> > > |
> > > |
> > > 17626017841@163.com
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > > 在2020年06月13日 09:32,zhangjunbao<wi...@outlook.com> 写道:
> > > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> > > https://issues.apache.org/jira/browse/FLINK-17189 <
> > > https://issues.apache.org/jira/browse/FLINK-17189>
> > >
> > > Best,
> > > Junbao Zhang
> > >
> > > 2020年6月13日 上午12:31,Sun.Zhu <17...@163.com> 写道:
> > >
> > > hi,all
> > > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> > > ddl如下:
> > > |
> > > CREATETABLE user_behavior (
> > > user_id BIGINT,
> > > item_id BIGINT,
> > > category_id BIGINT,
> > > behavior STRING,
> > > ts TIMESTAMP(3),
> > > proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> > > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> > > ) WITH (
> > > 'connector.type' = 'kafka',  -- 使用 kafka connector
> > > 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> > > 'connector.topic' = 'user_behavior',  -- kafka topic
> > > 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> > > 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> > zookeeper
> > > 地址
> > > 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> > > broker 地址
> > > 'format.type' = 'json'-- 数据源格式为 json
> > > );
> > > |
> > > 在查询时select * from user_behavior;报错如下:
> > > [ERROR] Could not execute SQL statement. Reason:
> > > java.lang.AssertionError: Conversion to relational algebra failed to
> > > preserve datatypes:
> > > validated type:
> > > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> > > converted type:
> > > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> > NULL
> > > rel:
> > > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > > behavior=[$3], ts=[$4], proctime=[$5])
> > > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> > > SECOND)])
> > > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > > behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> > > LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> > > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> > >
> > >
> > > flink版本:1.10.1
> > > blink planner,streaming model
> > >
> > >
> > > Thx
> > > | |
> > > Sun.Zhu
> > > |
> > > |
> > > 17626017841@163.com
> > > |
> > > 签名由网易邮箱大师定制
> > >
> > >
> > >
> >
>


-- 
Best regards!
Rui Li

回复:sqlclient集成hiveCatalog查询kafka表问题

Posted by "Sun.Zhu" <17...@163.com>.
Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息




在2020年06月17日 10:27,Benchao Li 写道:
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17...@163.com> 于2020年6月17日周三 上午12:49写道:

> 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
>
>
>
>
> 在2020年06月16日 18:38,Benchao Li 写道:
> 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
>
> Sun.Zhu <17...@163.com> 于2020年6月16日周二 下午6:11写道:
>
> > 我编译了1.11包
> > 在sql-cli下查询hive的表报如下错误:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
> >
> >
> > 查注册的kafka表报:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.ClassNotFoundException:
> org.apache.flink.table.dataformat.BaseRow
> >
> >
> > 依赖包是从1.10.1下面拷贝的
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017841@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 11:44,Sun.Zhu<17...@163.com> 写道:
> > Got it!
> > Thx,junbao
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017841@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 09:32,zhangjunbao<wi...@outlook.com> 写道:
> > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> > https://issues.apache.org/jira/browse/FLINK-17189 <
> > https://issues.apache.org/jira/browse/FLINK-17189>
> >
> > Best,
> > Junbao Zhang
> >
> > 2020年6月13日 上午12:31,Sun.Zhu <17...@163.com> 写道:
> >
> > hi,all
> > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> > ddl如下:
> > |
> > CREATETABLE user_behavior (
> > user_id BIGINT,
> > item_id BIGINT,
> > category_id BIGINT,
> > behavior STRING,
> > ts TIMESTAMP(3),
> > proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> > ) WITH (
> > 'connector.type' = 'kafka',  -- 使用 kafka connector
> > 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> > 'connector.topic' = 'user_behavior',  -- kafka topic
> > 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> > 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> zookeeper
> > 地址
> > 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> > broker 地址
> > 'format.type' = 'json'-- 数据源格式为 json
> > );
> > |
> > 在查询时select * from user_behavior;报错如下:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.AssertionError: Conversion to relational algebra failed to
> > preserve datatypes:
> > validated type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> > converted type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> NULL
> > rel:
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[$5])
> > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> > SECOND)])
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> > LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> >
> >
> > flink版本:1.10.1
> > blink planner,streaming model
> >
> >
> > Thx
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017841@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> >
>

Re: sqlclient集成hiveCatalog查询kafka表问题

Posted by Benchao Li <li...@apache.org>.
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17...@163.com> 于2020年6月17日周三 上午12:49写道:

> 是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
> 还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?
>
>
>
>
> 在2020年06月16日 18:38,Benchao Li 写道:
> 1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
> 你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。
>
> Sun.Zhu <17...@163.com> 于2020年6月16日周二 下午6:11写道:
>
> > 我编译了1.11包
> > 在sql-cli下查询hive的表报如下错误:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.NoClassDefFoundError: org/apache/flink/table/dataformat/BaseRow
> >
> >
> > 查注册的kafka表报:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.ClassNotFoundException:
> org.apache.flink.table.dataformat.BaseRow
> >
> >
> > 依赖包是从1.10.1下面拷贝的
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017841@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 11:44,Sun.Zhu<17...@163.com> 写道:
> > Got it!
> > Thx,junbao
> >
> >
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017841@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月13日 09:32,zhangjunbao<wi...@outlook.com> 写道:
> > 1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
> > https://issues.apache.org/jira/browse/FLINK-17189 <
> > https://issues.apache.org/jira/browse/FLINK-17189>
> >
> > Best,
> > Junbao Zhang
> >
> > 2020年6月13日 上午12:31,Sun.Zhu <17...@163.com> 写道:
> >
> > hi,all
> > 在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
> > ddl如下:
> > |
> > CREATETABLE user_behavior (
> > user_id BIGINT,
> > item_id BIGINT,
> > category_id BIGINT,
> > behavior STRING,
> > ts TIMESTAMP(3),
> > proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
> > WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
> > ) WITH (
> > 'connector.type' = 'kafka',  -- 使用 kafka connector
> > 'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
> > 'connector.topic' = 'user_behavior',  -- kafka topic
> > 'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
> > 'connector.properties.zookeeper.connect' = 'localhost:2181',  --
> zookeeper
> > 地址
> > 'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
> > broker 地址
> > 'format.type' = 'json'-- 数据源格式为 json
> > );
> > |
> > 在查询时select * from user_behavior;报错如下:
> > [ERROR] Could not execute SQL statement. Reason:
> > java.lang.AssertionError: Conversion to relational algebra failed to
> > preserve datatypes:
> > validated type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIMESTAMP(3) NOT NULL proctime) NOT NULL
> > converted type:
> > RecordType(BIGINT user_id, BIGINT item_id, BIGINT category_id,
> > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" behavior, TIME
> > ATTRIBUTE(ROWTIME) ts, TIME ATTRIBUTE(PROCTIME) NOT NULL proctime) NOT
> NULL
> > rel:
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[$5])
> > LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($4, 5000:INTERVAL
> > SECOND)])
> > LogicalProject(user_id=[$0], item_id=[$1], category_id=[$2],
> > behavior=[$3], ts=[$4], proctime=[PROCTIME()])
> > LogicalTableScan(table=[[myhive, my_db, user_behavior, source:
> > [KafkaTableSource(user_id, item_id, category_id, behavior, ts)]]])
> >
> >
> > flink版本:1.10.1
> > blink planner,streaming model
> >
> >
> > Thx
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017841@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> >
>