You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> on 2020/03/10 09:51:27 UTC
Dose flink-1.10 sql-client support kafka sink?
I have configured source table successfully using the following configuration:
- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
type: STRING
- name: input_date
type: BIGINT
- name: owner_code
type: STRING
connector:
property-version: 1
type: kafka
version: universal
topic: out_order
startup-mode: latest-offset
properties:
- key: zookeeper.connect
value: 172.19.78.32:2181
- key: bootstrap.servers
value: 172.19.78.32:9092
- key: group.id
property-version: 1
type: json
schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"
How can i configure a sink table? I haven't found any useful docs for this.
Thanks,
Lei
Re: Re: Dose flink-1.10 sql-client support kafka sink?
Posted by Jark Wu <im...@gmail.com>.
Hi Lei,
Yes. If you are creating a Kafka table, then the kafka connector jar and
some format jars are required.
That's weird. If DDL is failed, the yaml way should fail in the same
exception, unless some connector properties value is not the same.
Could you share the detailed exception stack?
Best,
Jark
On Wed, 11 Mar 2020 at 14:51, wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
> Hi Jark,
>
> I have tried to use CREATE table DDL
> First ./bin/sql-client.sh embedded. Then create a table from kafka topic
> and it tell me table has been created.
> But when I query with select * from tableName. There's error:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
> a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Perhaps i need some jar to the lib directory.
> But If i write the table configuration in the sql-client-defaults.yaml
> file,i can select the result correctly
>
> Thanks,
> Lei
>
> ------------------------------
>
>
> *From:* Jark Wu <im...@gmail.com>
> *Date:* 2020-03-11 11:13
> *To:* wanglei2@geekplus.com.cn
> *CC:* Arvid Heise <ar...@ververica.com>; user <us...@flink.apache.org>
> *Subject:* Re: Re: Dose flink-1.10 sql-client support kafka sink?
> Hi Lei,
>
> CREATE TABLE DDL [1][2] is the recommended way to register a table since
> 1.9. And the yaml way might be deprecated in the future.
> By using DDL, a registered table can both be used as source and sink.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>
> On Tue, 10 Mar 2020 at 21:52, wanglei2@geekplus.com.cn <
> wanglei2@geekplus.com.cn> wrote:
>
>> Thanks, works now.
>>
>> Seems it is because i added the
>> schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code
>> STRING, status INT)"
>>
>> under format label.
>>
>> *From:* Arvid Heise <ar...@ververica.com>
>> *Date:* 2020-03-10 20:51
>> *To:* wanglei2@geekplus.com.cn
>> *CC:* user <us...@flink.apache.org>
>> *Subject:* Re: Dose flink-1.10 sql-client support kafka sink?
>> Hi Lei,
>>
>> yes Kafka as a sink is supported albeit only for appends (no
>> deletions/updates yet) [1].
>>
>> An example is a bit hidden in the documentation [2]:
>>
>> tables:
>> - name: MyTableSink
>> type: sink-table
>> update-mode: append
>> connector:
>> property-version: 1
>> type: kafka
>> version: "0.11"
>> topic: OutputTopic
>> properties:
>> zookeeper.connect: localhost:2181
>> bootstrap.servers: localhost:9092
>> group.id: testGroup
>> format:
>> property-version: 1
>> type: json
>> derive-schema: true
>> schema:
>> - name: rideId
>> data-type: BIGINT
>> - name: lon
>> data-type: FLOAT
>> - name: lat
>> data-type: FLOAT
>> - name: rideTime
>> data-type: TIMESTAMP(3)
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
>>
>> On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <
>> wanglei2@geekplus.com.cn> wrote:
>>
>>>
>>> I have configured source table successfully using the following
>>> configuration:
>>>
>>> - name: out_order
>>> type: source
>>> update-mode: append
>>> schema:
>>> - name: out_order_code
>>> type: STRING
>>> - name: input_date
>>> type: BIGINT
>>> - name: owner_code
>>> type: STRING
>>> connector:
>>> property-version: 1
>>> type: kafka
>>> version: universal
>>> topic: out_order
>>> startup-mode: latest-offset
>>> properties:
>>> - key: zookeeper.connect
>>> value: 172.19.78.32:2181
>>> - key: bootstrap.servers
>>> value: 172.19.78.32:9092
>>> - key: group.id
>>> property-version: 1
>>> type: json
>>> schema: "ROW(out_order_code STRING,owner_code STRING,input_date
>>> BIGINT)"
>>>
>>> How can i configure a sink table? I haven't found any useful docs for
>>> this.
>>>
>>> Thanks,
>>> Lei
>>>
>>
Re: Re: Dose flink-1.10 sql-client support kafka sink?
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
I am using flink-1.10. But I add flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar to lib directory.
After change to flink-json-1.10.0.jar, flink-sql-connector-kafka_2.12-1.10.0.jar, it works.
But I have no idea why the yaml way works when i use flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar in flink-1.10 environment.
Thanks,
Lei
wanglei2@geekplus.com.cn
From: wanglei2@geekplus.com.cn
Date: 2020-03-11 14:51
To: Jark Wu
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Jark,
I have tried to use CREATE table DDL
First ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created.
But when I query with select * from tableName. There's error:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly
Thanks,
Lei
From: Jark Wu
Date: 2020-03-11 11:13
To: wanglei2@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,
CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future.
By using DDL, a registered table can both be used as source and sink.
Best,
Jark
[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
On Tue, 10 Mar 2020 at 21:52, wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
Thanks, works now.
Seems it is because i added the
schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)"
under format label.
From: Arvid Heise
Date: 2020-03-10 20:51
To: wanglei2@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,
yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].
An example is a bit hidden in the documentation [2]:
tables:
- name: MyTableSink
type: sink-table
update-mode: append
connector:
property-version: 1
type: kafka
version: "0.11"
topic: OutputTopic
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
property-version: 1
type: json
derive-schema: true
schema:
- name: rideId
data-type: BIGINT
- name: lon
data-type: FLOAT
- name: lat
data-type: FLOAT
- name: rideTime
data-type: TIMESTAMP(3)
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
I have configured source table successfully using the following configuration:
- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
type: STRING
- name: input_date
type: BIGINT
- name: owner_code
type: STRING
connector:
property-version: 1
type: kafka
version: universal
topic: out_order
startup-mode: latest-offset
properties:
- key: zookeeper.connect
value: 172.19.78.32:2181
- key: bootstrap.servers
value: 172.19.78.32:9092
- key: group.id
property-version: 1
type: json
schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"
How can i configure a sink table? I haven't found any useful docs for this.
Thanks,
Lei
Re: Re: Dose flink-1.10 sql-client support kafka sink?
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Hi Jark,
I have tried to use CREATE table DDL
First ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created.
But when I query with select * from tableName. There's error:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly
Thanks,
Lei
From: Jark Wu
Date: 2020-03-11 11:13
To: wanglei2@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,
CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future.
By using DDL, a registered table can both be used as source and sink.
Best,
Jark
[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
On Tue, 10 Mar 2020 at 21:52, wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
Thanks, works now.
Seems it is because i added the
schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)"
under format label.
From: Arvid Heise
Date: 2020-03-10 20:51
To: wanglei2@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,
yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].
An example is a bit hidden in the documentation [2]:
tables:
- name: MyTableSink
type: sink-table
update-mode: append
connector:
property-version: 1
type: kafka
version: "0.11"
topic: OutputTopic
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
property-version: 1
type: json
derive-schema: true
schema:
- name: rideId
data-type: BIGINT
- name: lon
data-type: FLOAT
- name: lat
data-type: FLOAT
- name: rideTime
data-type: TIMESTAMP(3)
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
I have configured source table successfully using the following configuration:
- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
type: STRING
- name: input_date
type: BIGINT
- name: owner_code
type: STRING
connector:
property-version: 1
type: kafka
version: universal
topic: out_order
startup-mode: latest-offset
properties:
- key: zookeeper.connect
value: 172.19.78.32:2181
- key: bootstrap.servers
value: 172.19.78.32:9092
- key: group.id
property-version: 1
type: json
schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"
How can i configure a sink table? I haven't found any useful docs for this.
Thanks,
Lei
Re: Re: Dose flink-1.10 sql-client support kafka sink?
Posted by Jark Wu <im...@gmail.com>.
Hi Lei,
CREATE TABLE DDL [1][2] is the recommended way to register a table since
1.9. And the yaml way might be deprecated in the future.
By using DDL, a registered table can both be used as source and sink.
Best,
Jark
[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
On Tue, 10 Mar 2020 at 21:52, wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
> Thanks, works now.
>
> Seems it is because i added the
> schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code
> STRING, status INT)"
>
> under format label.
>
> *From:* Arvid Heise <ar...@ververica.com>
> *Date:* 2020-03-10 20:51
> *To:* wanglei2@geekplus.com.cn
> *CC:* user <us...@flink.apache.org>
> *Subject:* Re: Dose flink-1.10 sql-client support kafka sink?
> Hi Lei,
>
> yes Kafka as a sink is supported albeit only for appends (no
> deletions/updates yet) [1].
>
> An example is a bit hidden in the documentation [2]:
>
> tables:
> - name: MyTableSink
> type: sink-table
> update-mode: append
> connector:
> property-version: 1
> type: kafka
> version: "0.11"
> topic: OutputTopic
> properties:
> zookeeper.connect: localhost:2181
> bootstrap.servers: localhost:9092
> group.id: testGroup
> format:
> property-version: 1
> type: json
> derive-schema: true
> schema:
> - name: rideId
> data-type: BIGINT
> - name: lon
> data-type: FLOAT
> - name: lat
> data-type: FLOAT
> - name: rideTime
> data-type: TIMESTAMP(3)
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
>
> On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <
> wanglei2@geekplus.com.cn> wrote:
>
>>
>> I have configured source table successfully using the following
>> configuration:
>>
>> - name: out_order
>> type: source
>> update-mode: append
>> schema:
>> - name: out_order_code
>> type: STRING
>> - name: input_date
>> type: BIGINT
>> - name: owner_code
>> type: STRING
>> connector:
>> property-version: 1
>> type: kafka
>> version: universal
>> topic: out_order
>> startup-mode: latest-offset
>> properties:
>> - key: zookeeper.connect
>> value: 172.19.78.32:2181
>> - key: bootstrap.servers
>> value: 172.19.78.32:9092
>> - key: group.id
>> property-version: 1
>> type: json
>> schema: "ROW(out_order_code STRING,owner_code STRING,input_date
>> BIGINT)"
>>
>> How can i configure a sink table? I haven't found any useful docs for
>> this.
>>
>> Thanks,
>> Lei
>>
>
Re: Re: Dose flink-1.10 sql-client support kafka sink?
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks, works now.
Seems it is because i added the
schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)"
under format label.
From: Arvid Heise
Date: 2020-03-10 20:51
To: wanglei2@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,
yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].
An example is a bit hidden in the documentation [2]:
tables:
- name: MyTableSink
type: sink-table
update-mode: append
connector:
property-version: 1
type: kafka
version: "0.11"
topic: OutputTopic
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
property-version: 1
type: json
derive-schema: true
schema:
- name: rideId
data-type: BIGINT
- name: lon
data-type: FLOAT
- name: lat
data-type: FLOAT
- name: rideTime
data-type: TIMESTAMP(3)
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
I have configured source table successfully using the following configuration:
- name: out_order
type: source
update-mode: append
schema:
- name: out_order_code
type: STRING
- name: input_date
type: BIGINT
- name: owner_code
type: STRING
connector:
property-version: 1
type: kafka
version: universal
topic: out_order
startup-mode: latest-offset
properties:
- key: zookeeper.connect
value: 172.19.78.32:2181
- key: bootstrap.servers
value: 172.19.78.32:9092
- key: group.id
property-version: 1
type: json
schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"
How can i configure a sink table? I haven't found any useful docs for this.
Thanks,
Lei
Re: Dose flink-1.10 sql-client support kafka sink?
Posted by Arvid Heise <ar...@ververica.com>.
Hi Lei,
yes Kafka as a sink is supported albeit only for appends (no
deletions/updates yet) [1].
An example is a bit hidden in the documentation [2]:
tables:
- name: MyTableSink
type: sink-table
update-mode: append
connector:
property-version: 1
type: kafka
version: "0.11"
topic: OutputTopic
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
property-version: 1
type: json
derive-schema: true
schema:
- name: rideId
data-type: BIGINT
- name: lon
data-type: FLOAT
- name: lat
data-type: FLOAT
- name: rideTime
data-type: TIMESTAMP(3)
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
>
> I have configured source table successfully using the following
> configuration:
>
> - name: out_order
> type: source
> update-mode: append
> schema:
> - name: out_order_code
> type: STRING
> - name: input_date
> type: BIGINT
> - name: owner_code
> type: STRING
> connector:
> property-version: 1
> type: kafka
> version: universal
> topic: out_order
> startup-mode: latest-offset
> properties:
> - key: zookeeper.connect
> value: 172.19.78.32:2181
> - key: bootstrap.servers
> value: 172.19.78.32:9092
> - key: group.id
> property-version: 1
> type: json
> schema: "ROW(out_order_code STRING,owner_code STRING,input_date
> BIGINT)"
>
> How can i configure a sink table? I haven't found any useful docs for this.
>
> Thanks,
> Lei
>