You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Arvid Heise <ar...@ververica.com> on 2020/03/10 12:51:01 UTC

Re: Dose flink-1.10 sql-client support kafka sink?

Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no
deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:

tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

>
> I have configured  source table successfully using the following
> configuration:
>
> - name: out_order
>     type: source
>     update-mode: append
>     schema:
>     - name: out_order_code
>       type: STRING
>     - name: input_date
>       type: BIGINT
>     - name: owner_code
>       type: STRING
>     connector:
>       property-version: 1
>       type: kafka
>       version: universal
>       topic: out_order
>       startup-mode: latest-offset
>       properties:
>       - key: zookeeper.connect
>         value: 172.19.78.32:2181
>       - key: bootstrap.servers
>         value: 172.19.78.32:9092
>       - key: group.id
>       property-version: 1
>       type: json
>       schema: "ROW(out_order_code STRING,owner_code STRING,input_date
> BIGINT)"
>
> How can i configure a sink table? I haven't found any useful docs for this.
>
> Thanks,
> Lei
>

Re: Re: Dose flink-1.10 sql-client support kafka sink?

Posted by Jark Wu <im...@gmail.com>.
Hi Lei,

Yes. If you are creating a Kafka table, then the kafka connector jar and
some format jars are required.

That's weird. If DDL is failed, the yaml way should fail in the same
exception, unless some connector properties value is not the same.

Could you share the detailed exception stack?

Best,
Jark

On Wed, 11 Mar 2020 at 14:51, wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

> Hi Jark,
>
> I  have tried to use CREATE table DDL
> First  ./bin/sql-client.sh embedded. Then create a table from kafka topic
> and it tell me table has been created.
> But when I query with select * from tableName. There's error:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
> a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Perhaps i need some jar to the lib directory.
> But If i write the table configuration in the sql-client-defaults.yaml
> file,i can select the result correctly
>
> Thanks,
> Lei
>
> ------------------------------
>
>
> *From:* Jark Wu <im...@gmail.com>
> *Date:* 2020-03-11 11:13
> *To:* wanglei2@geekplus.com.cn
> *CC:* Arvid Heise <ar...@ververica.com>; user <us...@flink.apache.org>
> *Subject:* Re: Re: Dose flink-1.10 sql-client support kafka sink?
> Hi Lei,
>
> CREATE TABLE DDL [1][2] is the recommended way to register a table since
> 1.9. And the yaml way might be deprecated in the future.
> By using DDL, a registered table can both be used as source and sink.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>
> On Tue, 10 Mar 2020 at 21:52, wanglei2@geekplus.com.cn <
> wanglei2@geekplus.com.cn> wrote:
>
>> Thanks, works now.
>>
>> Seems it is because i added the
>>    schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code
>> STRING, status INT)"
>>
>> under format label.
>>
>> *From:* Arvid Heise <ar...@ververica.com>
>> *Date:* 2020-03-10 20:51
>> *To:* wanglei2@geekplus.com.cn
>> *CC:* user <us...@flink.apache.org>
>> *Subject:* Re: Dose flink-1.10 sql-client support kafka sink?
>> Hi Lei,
>>
>> yes Kafka as a sink is supported albeit only for appends (no
>> deletions/updates yet) [1].
>>
>> An example is a bit hidden in the documentation [2]:
>>
>> tables:
>>   - name: MyTableSink
>>     type: sink-table
>>     update-mode: append
>>     connector:
>>       property-version: 1
>>       type: kafka
>>       version: "0.11"
>>       topic: OutputTopic
>>       properties:
>>         zookeeper.connect: localhost:2181
>>         bootstrap.servers: localhost:9092
>>         group.id: testGroup
>>     format:
>>       property-version: 1
>>       type: json
>>       derive-schema: true
>>     schema:
>>       - name: rideId
>>         data-type: BIGINT
>>       - name: lon
>>         data-type: FLOAT
>>       - name: lat
>>         data-type: FLOAT
>>       - name: rideTime
>>         data-type: TIMESTAMP(3)
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
>>
>> On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <
>> wanglei2@geekplus.com.cn> wrote:
>>
>>>
>>> I have configured  source table successfully using the following
>>> configuration:
>>>
>>> - name: out_order
>>>     type: source
>>>     update-mode: append
>>>     schema:
>>>     - name: out_order_code
>>>       type: STRING
>>>     - name: input_date
>>>       type: BIGINT
>>>     - name: owner_code
>>>       type: STRING
>>>     connector:
>>>       property-version: 1
>>>       type: kafka
>>>       version: universal
>>>       topic: out_order
>>>       startup-mode: latest-offset
>>>       properties:
>>>       - key: zookeeper.connect
>>>         value: 172.19.78.32:2181
>>>       - key: bootstrap.servers
>>>         value: 172.19.78.32:9092
>>>       - key: group.id
>>>       property-version: 1
>>>       type: json
>>>       schema: "ROW(out_order_code STRING,owner_code STRING,input_date
>>> BIGINT)"
>>>
>>> How can i configure a sink table? I haven't found any useful docs for
>>> this.
>>>
>>> Thanks,
>>> Lei
>>>
>>

Re: Re: Dose flink-1.10 sql-client support kafka sink?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
I am using flink-1.10.  But I add flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar to lib directory.
After change to flink-json-1.10.0.jar, flink-sql-connector-kafka_2.12-1.10.0.jar, it works.

But I have no idea why the yaml way works when i use  flink-json-1.9.1.jar and flink-sql-connector-kafka_2.11-1.9.1.jar in  flink-1.10 environment.

Thanks,
Lei



wanglei2@geekplus.com.cn

 
From: wanglei2@geekplus.com.cn
Date: 2020-03-11 14:51
To: Jark Wu
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Jark, 

I  have tried to use CREATE table DDL
First  ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created.
But when I query with select * from tableName. There's error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly

Thanks,
Lei
 



 
From: Jark Wu
Date: 2020-03-11 11:13
To: wanglei2@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark

[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

On Tue, 10 Mar 2020 at 21:52, wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wanglei2@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei

Re: Re: Dose flink-1.10 sql-client support kafka sink?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Hi Jark, 

I  have tried to use CREATE table DDL
First  ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created.
But when I query with select * from tableName. There's error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Perhaps i need some jar to the lib directory.
But If i write the table configuration in the sql-client-defaults.yaml file,i can select the result correctly

Thanks,
Lei
 



 
From: Jark Wu
Date: 2020-03-11 11:13
To: wanglei2@geekplus.com.cn
CC: Arvid Heise; user
Subject: Re: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. 
By using DDL, a registered table can both be used as source and sink. 

Best,
Jark

[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

On Tue, 10 Mar 2020 at 21:52, wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wanglei2@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei

Re: Re: Dose flink-1.10 sql-client support kafka sink?

Posted by Jark Wu <im...@gmail.com>.
Hi Lei,

CREATE TABLE DDL [1][2] is the recommended way to register a table since
1.9. And the yaml way might be deprecated in the future.
By using DDL, a registered table can both be used as source and sink.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

On Tue, 10 Mar 2020 at 21:52, wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

> Thanks, works now.
>
> Seems it is because i added the
>    schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code
> STRING, status INT)"
>
> under format label.
>
> *From:* Arvid Heise <ar...@ververica.com>
> *Date:* 2020-03-10 20:51
> *To:* wanglei2@geekplus.com.cn
> *CC:* user <us...@flink.apache.org>
> *Subject:* Re: Dose flink-1.10 sql-client support kafka sink?
> Hi Lei,
>
> yes Kafka as a sink is supported albeit only for appends (no
> deletions/updates yet) [1].
>
> An example is a bit hidden in the documentation [2]:
>
> tables:
>   - name: MyTableSink
>     type: sink-table
>     update-mode: append
>     connector:
>       property-version: 1
>       type: kafka
>       version: "0.11"
>       topic: OutputTopic
>       properties:
>         zookeeper.connect: localhost:2181
>         bootstrap.servers: localhost:9092
>         group.id: testGroup
>     format:
>       property-version: 1
>       type: json
>       derive-schema: true
>     schema:
>       - name: rideId
>         data-type: BIGINT
>       - name: lon
>         data-type: FLOAT
>       - name: lat
>         data-type: FLOAT
>       - name: rideTime
>         data-type: TIMESTAMP(3)
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries
>
> On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <
> wanglei2@geekplus.com.cn> wrote:
>
>>
>> I have configured  source table successfully using the following
>> configuration:
>>
>> - name: out_order
>>     type: source
>>     update-mode: append
>>     schema:
>>     - name: out_order_code
>>       type: STRING
>>     - name: input_date
>>       type: BIGINT
>>     - name: owner_code
>>       type: STRING
>>     connector:
>>       property-version: 1
>>       type: kafka
>>       version: universal
>>       topic: out_order
>>       startup-mode: latest-offset
>>       properties:
>>       - key: zookeeper.connect
>>         value: 172.19.78.32:2181
>>       - key: bootstrap.servers
>>         value: 172.19.78.32:9092
>>       - key: group.id
>>       property-version: 1
>>       type: json
>>       schema: "ROW(out_order_code STRING,owner_code STRING,input_date
>> BIGINT)"
>>
>> How can i configure a sink table? I haven't found any useful docs for
>> this.
>>
>> Thanks,
>> Lei
>>
>

Re: Re: Dose flink-1.10 sql-client support kafka sink?

Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks, works now. 

Seems it is because i added the  
   schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)"

under format label.

From: Arvid Heise
Date: 2020-03-10 20:51
To: wanglei2@geekplus.com.cn
CC: user
Subject: Re: Dose flink-1.10 sql-client support kafka sink?
Hi Lei,

yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1].

An example is a bit hidden in the documentation [2]:
tables:
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      property-version: 1
      type: kafka
      version: "0.11"
      topic: OutputTopic
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format:
      property-version: 1
      type: json
      derive-schema: true
    schema:
      - name: rideId
        data-type: BIGINT
      - name: lon
        data-type: FLOAT
      - name: lat
        data-type: FLOAT
      - name: rideTime
        data-type: TIMESTAMP(3)
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#detached-sql-queries

On Tue, Mar 10, 2020 at 10:51 AM wanglei2@geekplus.com.cn <wa...@geekplus.com.cn> wrote:

I have configured  source table successfully using the following configuration:

- name: out_order
    type: source
    update-mode: append
    schema:
    - name: out_order_code
      type: STRING
    - name: input_date
      type: BIGINT
    - name: owner_code
      type: STRING
    connector:
      property-version: 1
      type: kafka
      version: universal
      topic: out_order
      startup-mode: latest-offset
      properties:
      - key: zookeeper.connect
        value: 172.19.78.32:2181
      - key: bootstrap.servers
        value: 172.19.78.32:9092
      - key: group.id
      property-version: 1
      type: json
      schema: "ROW(out_order_code STRING,owner_code STRING,input_date BIGINT)"

How can i configure a sink table? I haven't found any useful docs for this.

Thanks,
Lei