You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wang <24...@163.com> on 2022/03/31 13:42:50 UTC

Could you please give me a hand about json object in flink sql

Hi dear engineer,


Thanks so much for your precious time reading my email!
Resently I'm working on the Flink sql (with version 1.13) in my project and encountered one problem about json format data, hope you can take a look, thanks! Below is the description of my issue.


I use kafka as source and sink, I define kafka source table like this:


     CREATE TABLE TableSource (
          schema STRING,
          payload ROW(
              `id` STRING,
              `content` STRING
         )
     )
     WITH (
         'connector' = 'kafka',
         'topic' = 'topic_source',
         'properties.bootstrap.servers' = 'localhost:9092',
         'properties.group.id' = 'all_gp',
         'scan.startup.mode' = 'group-offsets',
         'format' = 'json',
         'json.fail-on-missing-field' = 'false',
         'json.ignore-parse-errors' = 'true'
     );


Define the kafka sink table like this:


     CREATE TABLE TableSink (
          `id` STRING NOT NULL,
          `content` STRING NOT NULL
     )
     WITH (
         'connector' = 'kafka',
         'topic' = 'topic_sink',
         'properties.bootstrap.servers' = 'localhost:9092',
         'format' = 'json',
         'json.fail-on-missing-field' = 'false',
         'json.ignore-parse-errors' = 'true'
    );




Then insert into TableSink with data from TableSource:
    INSERT INTO TableSink SELECT id, content FROM TableSource;


Then I use "kafka-console-producer.sh" to produce data below into topic "topic_source" (TableSource):
    {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}




Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the output is:
    {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}


But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
I want the the value of "content" is json object, not json string.


And what's more, the format of "content" in TableSource is not fixed, it can be any json formated(or json array format) string, such as:
{"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




So my question is, how can I transform json format string(like "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object (like{"name":"Jone","age":20} ).




Thanks && Regards,
Hunk


Re:Re: How can I set job parameter in flink sql

Posted by wang <24...@163.com>.
Ok, got it. Thanks so much!

Regards,
Hunk



--
发自我的网易邮箱手机智能版



在 2022-05-11 16:46:14,yuxia <lu...@alumni.sjtu.edu.cn> 写道:

Hi, AFAK,  you can't get the parameter setted via Flink SQL client in udf.


If you still want to get the parameters in your udf, you can use the following code to set the parameter:


env = StreamExecutionEnvironment.getExecutionEnvironment
parameter = new HashMap<String, String>();
parameter.put("black_list_path", "xxxx")
env.getConfig.setGlobalJobParameters(Configuration.fromMap(m))


Then, you can get the parameter using context.getJobParameter("black_list_path", "/config/list.properties"); in udf.


Best regards,
Yuxia


发件人: "wang" <24...@163.com>
收件人: "User" <us...@flink.apache.org>, "user-zh" <us...@flink.apache.org>
发送时间: 星期三, 2022年 5 月 11日 下午 2:44:20
主题: How can I set job parameter in flink sql



Hi dear engineer,


I want to override the function open() in my UDF, like:




|
public class BlackListConvertFunction extends ScalarFunction {


    @Override
    public void open(FunctionContext context) throws Exception {
        String path = context.getJobParameter("black_list_path", "/config/list.properties");
        System.out.println(path);
    }


    public Double eval(String scores) {
        // some logics
        return 0.0;
    }
}

|






In open() function, I want to fetch the configred value "black_list_path", then simply print that value out. And I config this value in ./sql-client.sh console:


    SET black_list_path = /root/list.properties


Then I run this UDF, but what printed is /config/list.properties(this is the default value as I set in context.getJobParameter("black_list_path", "/config/list/properties")), not /root/list.properties which I set in ./sql-client.sh console.


So could you please show me the correct way to set black_list_path is sql ? Thanks so much!




Thanks && Reards,
Hunk






 



Re:Re: How can I set job parameter in flink sql

Posted by wang <24...@163.com>.
Ok, got it. Thanks so much!

Regards,
Hunk



--
发自我的网易邮箱手机智能版



在 2022-05-11 16:46:14,yuxia <lu...@alumni.sjtu.edu.cn> 写道:

Hi, AFAK,  you can't get the parameter setted via Flink SQL client in udf.


If you still want to get the parameters in your udf, you can use the following code to set the parameter:


env = StreamExecutionEnvironment.getExecutionEnvironment
parameter = new HashMap<String, String>();
parameter.put("black_list_path", "xxxx")
env.getConfig.setGlobalJobParameters(Configuration.fromMap(m))


Then, you can get the parameter using context.getJobParameter("black_list_path", "/config/list.properties"); in udf.


Best regards,
Yuxia


发件人: "wang" <24...@163.com>
收件人: "User" <us...@flink.apache.org>, "user-zh" <us...@flink.apache.org>
发送时间: 星期三, 2022年 5 月 11日 下午 2:44:20
主题: How can I set job parameter in flink sql



Hi dear engineer,


I want to override the function open() in my UDF, like:




|
public class BlackListConvertFunction extends ScalarFunction {


    @Override
    public void open(FunctionContext context) throws Exception {
        String path = context.getJobParameter("black_list_path", "/config/list.properties");
        System.out.println(path);
    }


    public Double eval(String scores) {
        // some logics
        return 0.0;
    }
}

|






In open() function, I want to fetch the configred value "black_list_path", then simply print that value out. And I config this value in ./sql-client.sh console:


    SET black_list_path = /root/list.properties


Then I run this UDF, but what printed is /config/list.properties(this is the default value as I set in context.getJobParameter("black_list_path", "/config/list/properties")), not /root/list.properties which I set in ./sql-client.sh console.


So could you please show me the correct way to set black_list_path is sql ? Thanks so much!




Thanks && Reards,
Hunk






 



Re: How can I set job parameter in flink sql

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Hi, AFAK, you can't get the parameter setted via Flink SQL client in udf. 

If you still want to get the parameters in your udf, you can use the following code to set the parameter: 

env = StreamExecutionEnvironment.getExecutionEnvironment 
parameter = new HashMap<String, String>(); 
parameter .put(" black_list_path ", "xxxx") 
env.getConfig.setGlobalJobParameters(Configuration.fromMap(m)) 

Then, you can get the parameter using context.getJobParameter("black_list_path", "/config/list.properties"); in udf. 

Best regards, 
Yuxia 


发件人: "wang" <24...@163.com> 
收件人: "User" <us...@flink.apache.org>, "user-zh" <us...@flink.apache.org> 
发送时间: 星期三, 2022年 5 月 11日 下午 2:44:20 
主题: How can I set job parameter in flink sql 

Hi dear engineer, 

I want to override the function open() in my UDF, like: 


public class BlackListConvertFunction extends ScalarFunction { 

@Override 
public void open(FunctionContext context) throws Exception { 
String path = context.getJobParameter("black_list_path", "/config/list.properties"); 
System.out.println(path); 
} 

public Double eval(String scores) { 
// some logics 
return 0.0; 
} 
} 





In open() function, I want to fetch the configred value "black_list_path", then simply print that value out. And I config this value in ./sql-client.sh console: 

SET black_list_path = /root/list.properties 

Then I run this UDF, but what printed is /config/list.properties (this is the default value as I set in context.getJobParameter("black_list_path", " /config/list/properties ")) , not /root/list.properties which I set in ./sql-client.sh console. 

So could you please show me the correct way to set black_list_path is sql ? Thanks so much! 


Thanks && Reards, 
Hunk 







Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com.INVALID>.
Thank you for your reply
But in my local test environment (flink1.15 and flink1.16), when the chain of writer and commiter is disabled, the back pressure can be reduced.

The specific phenomenon is as follows:
1. After ck-4 is completed, the commit execution is very slow
2. At this time, the [Sink: Writer (1/1)#0] thread will continue to call the SinkWriter.write() method to receive upstream data.
3. After triggering ck-5, the prepareCommit and snapshotState methods will be executed
4. Because the last commit has not been completed, the [Sink: Committer (1/1)#0] thread will wait to call the commit method.
5. SinkWriter.write() can still continue to receive upstream data
6. ck-6 will not be triggered, and ck-6 will be triggered after the first commit is completed
The whole process will not block the method of SinkWriter.write().

However, if the chain is not disabled, receiving upstream data will be blocked in the second step.


Thanks && Regards,
di.wu

> 2023年2月17日 上午9:40,Shammon FY <zj...@gmail.com> 写道:
> 
> Hi wudi
> 
> I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
> chained together, and the Commiter task runs slowly, it can block the
> upstream Writer tasks by back pressure too.
> 
> On the other hand, you can try to increase the parallelism of sink node to
> speedup the Commiter operation
> 
> Best,
> Shammon
> 
> On Thu, Feb 16, 2023 at 11:38 PM wudi <67...@qq.com.invalid> wrote:
> 
>> thanks for your replies.
>> I think that if Writer and Commiter are not chained together, data
>> consistency can be guaranteed, right?
>> Because when the Commiter does not block the Writer, at the next
>> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
>> not be triggered
>> 
>> In addition, in this scenario (writer blocking caused by slow commit), may
>> the performance of disabling Sink's chain be better? Because it can reduce
>> a lot of back pressure.
>> 
>> Thanks && Regards,
>> di.wu
>> 
>> 
>>> 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
>>> 
>>> As far as I know that chain between committer and writer is also
>> required for correctness.
>>> 
>>> On 16/02/2023 14:53, weijie guo wrote:
>>>> Hi wu,
>>>> 
>>>> I don't think it is a good choice to directly change the strategy of
>> chain. Operator chain usually has better performance and resource
>> utilization. If we directly change the chain policy between them, users can
>> no longer chain them together, which is not a good starting point.
>>>> 
>>>> Best regards,
>>>> 
>>>> Weijie
>>>> 
>>>> 
>>>> 
>>>> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>>>> 
>>>>   Thank you for your reply.
>>>> 
>>>>   Currently in the custom Sink Connector, the Flink task will
>>>>   combine Writer and Committer into one thread, and the thread name
>>>>   is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>>>>   In this way, when the *Committer.commit()* method is very slow, it
>>>>   will block the*SinkWriter.write()* method to receive upstream data.
>>>> 
>>>>   The client can use the *env.disableOperatorChaining() *method to
>>>>   split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>>>>   *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>>>>   will not block the SinkWriter.write method.
>>>> 
>>>>   If the chain policy can be disabled in the custom Sink Connector,
>>>>   the client can be prevented from setting and disabling the chain.
>>>>   Or is there a better way to make*Committer.commit()* not block
>>>>   *SinkWriter.write()*?
>>>> 
>>>>   Looking forward for your reply.
>>>>   Thanks && Regards,
>>>>   di.wu
>>>> 
>>>>>   2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>>>> 
>>>>>   Hi
>>>>> 
>>>>>   Do you mean how to disable `chain` in your custom sink
>>>>>   connector?  Can you
>>>>>   give an example of what you want?
>>>>> 
>>>>>   On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>>>> 
>>>>>>   Hello
>>>>>> 
>>>>>>   The current Sink operator will be split into two operations,
>>>>>>   Writer and
>>>>>>   Commiter. By default, they will be chained together and executed
>>>>>>   on the
>>>>>>   same thread.
>>>>>>   So sometimes when the commiter is very slow, it will block the data
>>>>>>   writer, causing back pressure.
>>>>>> 
>>>>>>   At present, FlinkSQL can be solved by disabling the chain
>>>>>>   globally, and
>>>>>>   DataStream can partially disable the chain through the
>>>>>>   disableChaining
>>>>>>   method, but both of them need to be set by the user.
>>>>>> 
>>>>>>   Can the strategy of the Chain be changed in the Custom Sink
>>>>>>   Connector to
>>>>>>   separate Writer and Commiter?
>>>>>> 
>>>>>>   Thanks && Regards,
>>>>>>   di.wu
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
> 


Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com>.
Thank you for your reply
But in my local test environment (flink1.15 and flink1.16), when the chain of writer and commiter is disabled, the back pressure can be reduced.

The specific phenomenon is as follows:
1. After ck-4 is completed, the commit execution is very slow
2. At this time, the [Sink: Writer (1/1)#0] thread will continue to call the SinkWriter.write() method to receive upstream data.
3. After triggering ck-5, the prepareCommit and snapshotState methods will be executed
4. Because the last commit has not been completed, the [Sink: Committer (1/1)#0] thread will wait to call the commit method.
5. SinkWriter.write() can still continue to receive upstream data
6. ck-6 will not be triggered, and ck-6 will be triggered after the first commit is completed
The whole process will not block the method of SinkWriter.write().

However, if the chain is not disabled, receiving upstream data will be blocked in the second step.


Thanks && Regards,
di.wu

> 2023年2月17日 上午9:40,Shammon FY <zj...@gmail.com> 写道:
> 
> Hi wudi
> 
> I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
> chained together, and the Commiter task runs slowly, it can block the
> upstream Writer tasks by back pressure too.
> 
> On the other hand, you can try to increase the parallelism of sink node to
> speedup the Commiter operation
> 
> Best,
> Shammon
> 
> On Thu, Feb 16, 2023 at 11:38 PM wudi <67...@qq.com.invalid> wrote:
> 
>> thanks for your replies.
>> I think that if Writer and Commiter are not chained together, data
>> consistency can be guaranteed, right?
>> Because when the Commiter does not block the Writer, at the next
>> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
>> not be triggered
>> 
>> In addition, in this scenario (writer blocking caused by slow commit), may
>> the performance of disabling Sink's chain be better? Because it can reduce
>> a lot of back pressure.
>> 
>> Thanks && Regards,
>> di.wu
>> 
>> 
>>> 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
>>> 
>>> As far as I know that chain between committer and writer is also
>> required for correctness.
>>> 
>>> On 16/02/2023 14:53, weijie guo wrote:
>>>> Hi wu,
>>>> 
>>>> I don't think it is a good choice to directly change the strategy of
>> chain. Operator chain usually has better performance and resource
>> utilization. If we directly change the chain policy between them, users can
>> no longer chain them together, which is not a good starting point.
>>>> 
>>>> Best regards,
>>>> 
>>>> Weijie
>>>> 
>>>> 
>>>> 
>>>> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>>>> 
>>>>   Thank you for your reply.
>>>> 
>>>>   Currently in the custom Sink Connector, the Flink task will
>>>>   combine Writer and Committer into one thread, and the thread name
>>>>   is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>>>>   In this way, when the *Committer.commit()* method is very slow, it
>>>>   will block the*SinkWriter.write()* method to receive upstream data.
>>>> 
>>>>   The client can use the *env.disableOperatorChaining() *method to
>>>>   split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>>>>   *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>>>>   will not block the SinkWriter.write method.
>>>> 
>>>>   If the chain policy can be disabled in the custom Sink Connector,
>>>>   the client can be prevented from setting and disabling the chain.
>>>>   Or is there a better way to make*Committer.commit()* not block
>>>>   *SinkWriter.write()*?
>>>> 
>>>>   Looking forward for your reply.
>>>>   Thanks && Regards,
>>>>   di.wu
>>>> 
>>>>>   2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>>>> 
>>>>>   Hi
>>>>> 
>>>>>   Do you mean how to disable `chain` in your custom sink
>>>>>   connector?  Can you
>>>>>   give an example of what you want?
>>>>> 
>>>>>   On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>>>> 
>>>>>>   Hello
>>>>>> 
>>>>>>   The current Sink operator will be split into two operations,
>>>>>>   Writer and
>>>>>>   Commiter. By default, they will be chained together and executed
>>>>>>   on the
>>>>>>   same thread.
>>>>>>   So sometimes when the commiter is very slow, it will block the data
>>>>>>   writer, causing back pressure.
>>>>>> 
>>>>>>   At present, FlinkSQL can be solved by disabling the chain
>>>>>>   globally, and
>>>>>>   DataStream can partially disable the chain through the
>>>>>>   disableChaining
>>>>>>   method, but both of them need to be set by the user.
>>>>>> 
>>>>>>   Can the strategy of the Chain be changed in the Custom Sink
>>>>>>   Connector to
>>>>>>   separate Writer and Commiter?
>>>>>> 
>>>>>>   Thanks && Regards,
>>>>>>   di.wu
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
> 


Re: Disable the chain of the Sink operator

Posted by Shammon FY <zj...@gmail.com>.
Hi wudi

I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
chained together, and the Commiter task runs slowly, it can block the
upstream Writer tasks by back pressure too.

On the other hand, you can try to increase the parallelism of sink node to
speedup the Commiter operation

Best,
Shammon

On Thu, Feb 16, 2023 at 11:38 PM wudi <67...@qq.com.invalid> wrote:

> thanks for your replies.
> I think that if Writer and Commiter are not chained together, data
> consistency can be guaranteed, right?
> Because when the Commiter does not block the Writer, at the next
> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
> not be triggered
>
> In addition, in this scenario (writer blocking caused by slow commit), may
> the performance of disabling Sink's chain be better? Because it can reduce
> a lot of back pressure.
>
> Thanks && Regards,
> di.wu
>
>
> > 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
> >
> > As far as I know that chain between committer and writer is also
> required for correctness.
> >
> > On 16/02/2023 14:53, weijie guo wrote:
> >> Hi wu,
> >>
> >> I don't think it is a good choice to directly change the strategy of
> chain. Operator chain usually has better performance and resource
> utilization. If we directly change the chain policy between them, users can
> no longer chain them together, which is not a good starting point.
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >>
> >> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
> >>
> >>    Thank you for your reply.
> >>
> >>    Currently in the custom Sink Connector, the Flink task will
> >>    combine Writer and Committer into one thread, and the thread name
> >>    is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
> >>    In this way, when the *Committer.commit()* method is very slow, it
> >>    will block the*SinkWriter.write()* method to receive upstream data.
> >>
> >>    The client can use the *env.disableOperatorChaining() *method to
> >>    split the thread into two threads:*[Sink: Writer (1/1)#0] *and
> >>    *[Sink: Committer (1/1)#0]*. This Committer. The commit method
> >>    will not block the SinkWriter.write method.
> >>
> >>    If the chain policy can be disabled in the custom Sink Connector,
> >>    the client can be prevented from setting and disabling the chain.
> >>    Or is there a better way to make*Committer.commit()* not block
> >>    *SinkWriter.write()*?
> >>
> >>    Looking forward for your reply.
> >>    Thanks && Regards,
> >>    di.wu
> >>
> >>>    2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
> >>>
> >>>    Hi
> >>>
> >>>    Do you mean how to disable `chain` in your custom sink
> >>>    connector?  Can you
> >>>    give an example of what you want?
> >>>
> >>>    On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
> >>>
> >>>>    Hello
> >>>>
> >>>>    The current Sink operator will be split into two operations,
> >>>>    Writer and
> >>>>    Commiter. By default, they will be chained together and executed
> >>>>    on the
> >>>>    same thread.
> >>>>    So sometimes when the commiter is very slow, it will block the data
> >>>>    writer, causing back pressure.
> >>>>
> >>>>    At present, FlinkSQL can be solved by disabling the chain
> >>>>    globally, and
> >>>>    DataStream can partially disable the chain through the
> >>>>    disableChaining
> >>>>    method, but both of them need to be set by the user.
> >>>>
> >>>>    Can the strategy of the Chain be changed in the Custom Sink
> >>>>    Connector to
> >>>>    separate Writer and Commiter?
> >>>>
> >>>>    Thanks && Regards,
> >>>>    di.wu
> >>>>
> >>>
> >>
> >
>
>

Re: Disable the chain of the Sink operator

Posted by Shammon FY <zj...@gmail.com>.
Hi wudi

I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
chained together, and the Commiter task runs slowly, it can block the
upstream Writer tasks by back pressure too.

On the other hand, you can try to increase the parallelism of sink node to
speedup the Commiter operation

Best,
Shammon

On Thu, Feb 16, 2023 at 11:38 PM wudi <67...@qq.com.invalid> wrote:

> thanks for your replies.
> I think that if Writer and Commiter are not chained together, data
> consistency can be guaranteed, right?
> Because when the Commiter does not block the Writer, at the next
> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
> not be triggered
>
> In addition, in this scenario (writer blocking caused by slow commit), may
> the performance of disabling Sink's chain be better? Because it can reduce
> a lot of back pressure.
>
> Thanks && Regards,
> di.wu
>
>
> > 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
> >
> > As far as I know that chain between committer and writer is also
> required for correctness.
> >
> > On 16/02/2023 14:53, weijie guo wrote:
> >> Hi wu,
> >>
> >> I don't think it is a good choice to directly change the strategy of
> chain. Operator chain usually has better performance and resource
> utilization. If we directly change the chain policy between them, users can
> no longer chain them together, which is not a good starting point.
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >>
> >> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
> >>
> >>    Thank you for your reply.
> >>
> >>    Currently in the custom Sink Connector, the Flink task will
> >>    combine Writer and Committer into one thread, and the thread name
> >>    is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
> >>    In this way, when the *Committer.commit()* method is very slow, it
> >>    will block the*SinkWriter.write()* method to receive upstream data.
> >>
> >>    The client can use the *env.disableOperatorChaining() *method to
> >>    split the thread into two threads:*[Sink: Writer (1/1)#0] *and
> >>    *[Sink: Committer (1/1)#0]*. This Committer. The commit method
> >>    will not block the SinkWriter.write method.
> >>
> >>    If the chain policy can be disabled in the custom Sink Connector,
> >>    the client can be prevented from setting and disabling the chain.
> >>    Or is there a better way to make*Committer.commit()* not block
> >>    *SinkWriter.write()*?
> >>
> >>    Looking forward for your reply.
> >>    Thanks && Regards,
> >>    di.wu
> >>
> >>>    2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
> >>>
> >>>    Hi
> >>>
> >>>    Do you mean how to disable `chain` in your custom sink
> >>>    connector?  Can you
> >>>    give an example of what you want?
> >>>
> >>>    On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
> >>>
> >>>>    Hello
> >>>>
> >>>>    The current Sink operator will be split into two operations,
> >>>>    Writer and
> >>>>    Commiter. By default, they will be chained together and executed
> >>>>    on the
> >>>>    same thread.
> >>>>    So sometimes when the commiter is very slow, it will block the data
> >>>>    writer, causing back pressure.
> >>>>
> >>>>    At present, FlinkSQL can be solved by disabling the chain
> >>>>    globally, and
> >>>>    DataStream can partially disable the chain through the
> >>>>    disableChaining
> >>>>    method, but both of them need to be set by the user.
> >>>>
> >>>>    Can the strategy of the Chain be changed in the Custom Sink
> >>>>    Connector to
> >>>>    separate Writer and Commiter?
> >>>>
> >>>>    Thanks && Regards,
> >>>>    di.wu
> >>>>
> >>>
> >>
> >
>
>

Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com>.
thanks for your replies.
I think that if Writer and Commiter are not chained together, data consistency can be guaranteed, right?
Because when the Commiter does not block the Writer, at the next Checkpoint, if the Commit is not completed, the SinkWriter.precommit will not be triggered

In addition, in this scenario (writer blocking caused by slow commit), may the performance of disabling Sink's chain be better? Because it can reduce a lot of back pressure.

Thanks && Regards,
di.wu


> 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
> 
> As far as I know that chain between committer and writer is also required for correctness.
> 
> On 16/02/2023 14:53, weijie guo wrote:
>> Hi wu,
>> 
>> I don't think it is a good choice to directly change the strategy of chain. Operator chain usually has better performance and resource utilization. If we directly change the chain policy between them, users can no longer chain them together, which is not a good starting point.
>> 
>> Best regards,
>> 
>> Weijie
>> 
>> 
>> 
>> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>> 
>>    Thank you for your reply.
>> 
>>    Currently in the custom Sink Connector, the Flink task will
>>    combine Writer and Committer into one thread, and the thread name
>>    is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>>    In this way, when the *Committer.commit()* method is very slow, it
>>    will block the*SinkWriter.write()* method to receive upstream data.
>> 
>>    The client can use the *env.disableOperatorChaining() *method to
>>    split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>>    *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>>    will not block the SinkWriter.write method.
>> 
>>    If the chain policy can be disabled in the custom Sink Connector,
>>    the client can be prevented from setting and disabling the chain.
>>    Or is there a better way to make*Committer.commit()* not block
>>    *SinkWriter.write()*?
>> 
>>    Looking forward for your reply.
>>    Thanks && Regards,
>>    di.wu
>> 
>>>    2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>> 
>>>    Hi
>>> 
>>>    Do you mean how to disable `chain` in your custom sink
>>>    connector?  Can you
>>>    give an example of what you want?
>>> 
>>>    On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>> 
>>>>    Hello
>>>> 
>>>>    The current Sink operator will be split into two operations,
>>>>    Writer and
>>>>    Commiter. By default, they will be chained together and executed
>>>>    on the
>>>>    same thread.
>>>>    So sometimes when the commiter is very slow, it will block the data
>>>>    writer, causing back pressure.
>>>> 
>>>>    At present, FlinkSQL can be solved by disabling the chain
>>>>    globally, and
>>>>    DataStream can partially disable the chain through the
>>>>    disableChaining
>>>>    method, but both of them need to be set by the user.
>>>> 
>>>>    Can the strategy of the Chain be changed in the Custom Sink
>>>>    Connector to
>>>>    separate Writer and Commiter?
>>>> 
>>>>    Thanks && Regards,
>>>>    di.wu
>>>> 
>>> 
>> 
> 


Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com.INVALID>.
thanks for your replies.
I think that if Writer and Commiter are not chained together, data consistency can be guaranteed, right?
Because when the Commiter does not block the Writer, at the next Checkpoint, if the Commit is not completed, the SinkWriter.precommit will not be triggered

In addition, in this scenario (writer blocking caused by slow commit), may the performance of disabling Sink's chain be better? Because it can reduce a lot of back pressure.

Thanks && Regards,
di.wu


> 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
> 
> As far as I know that chain between committer and writer is also required for correctness.
> 
> On 16/02/2023 14:53, weijie guo wrote:
>> Hi wu,
>> 
>> I don't think it is a good choice to directly change the strategy of chain. Operator chain usually has better performance and resource utilization. If we directly change the chain policy between them, users can no longer chain them together, which is not a good starting point.
>> 
>> Best regards,
>> 
>> Weijie
>> 
>> 
>> 
>> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>> 
>>    Thank you for your reply.
>> 
>>    Currently in the custom Sink Connector, the Flink task will
>>    combine Writer and Committer into one thread, and the thread name
>>    is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>>    In this way, when the *Committer.commit()* method is very slow, it
>>    will block the*SinkWriter.write()* method to receive upstream data.
>> 
>>    The client can use the *env.disableOperatorChaining() *method to
>>    split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>>    *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>>    will not block the SinkWriter.write method.
>> 
>>    If the chain policy can be disabled in the custom Sink Connector,
>>    the client can be prevented from setting and disabling the chain.
>>    Or is there a better way to make*Committer.commit()* not block
>>    *SinkWriter.write()*?
>> 
>>    Looking forward for your reply.
>>    Thanks && Regards,
>>    di.wu
>> 
>>>    2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>> 
>>>    Hi
>>> 
>>>    Do you mean how to disable `chain` in your custom sink
>>>    connector?  Can you
>>>    give an example of what you want?
>>> 
>>>    On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>> 
>>>>    Hello
>>>> 
>>>>    The current Sink operator will be split into two operations,
>>>>    Writer and
>>>>    Commiter. By default, they will be chained together and executed
>>>>    on the
>>>>    same thread.
>>>>    So sometimes when the commiter is very slow, it will block the data
>>>>    writer, causing back pressure.
>>>> 
>>>>    At present, FlinkSQL can be solved by disabling the chain
>>>>    globally, and
>>>>    DataStream can partially disable the chain through the
>>>>    disableChaining
>>>>    method, but both of them need to be set by the user.
>>>> 
>>>>    Can the strategy of the Chain be changed in the Custom Sink
>>>>    Connector to
>>>>    separate Writer and Commiter?
>>>> 
>>>>    Thanks && Regards,
>>>>    di.wu
>>>> 
>>> 
>> 
> 


Re: Disable the chain of the Sink operator

Posted by Chesnay Schepler <ch...@apache.org>.
As far as I know that chain between committer and writer is also 
required for correctness.

On 16/02/2023 14:53, weijie guo wrote:
> Hi wu,
>
> I don't think it is a good choice to directly change the strategy of 
> chain. Operator chain usually has better performance and resource 
> utilization. If we directly change the chain policy between them, 
> users can no longer chain them together, which is not a good starting 
> point.
>
> Best regards,
>
> Weijie
>
>
>
> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>
>     Thank you for your reply.
>
>     Currently in the custom Sink Connector, the Flink task will
>     combine Writer and Committer into one thread, and the thread name
>     is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>     In this way, when the *Committer.commit()* method is very slow, it
>     will block the*SinkWriter.write()* method to receive upstream data.
>
>     The client can use the *env.disableOperatorChaining() *method to
>     split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>     *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>     will not block the SinkWriter.write method.
>
>     If the chain policy can be disabled in the custom Sink Connector,
>     the client can be prevented from setting and disabling the chain.
>     Or is there a better way to make*Committer.commit()* not block
>     *SinkWriter.write()*?
>
>     Looking forward for your reply.
>     Thanks && Regards,
>     di.wu
>
>>     2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>
>>     Hi
>>
>>     Do you mean how to disable `chain` in your custom sink
>>     connector?  Can you
>>     give an example of what you want?
>>
>>     On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>
>>>     Hello
>>>
>>>     The current Sink operator will be split into two operations,
>>>     Writer and
>>>     Commiter. By default, they will be chained together and executed
>>>     on the
>>>     same thread.
>>>     So sometimes when the commiter is very slow, it will block the data
>>>     writer, causing back pressure.
>>>
>>>     At present, FlinkSQL can be solved by disabling the chain
>>>     globally, and
>>>     DataStream can partially disable the chain through the
>>>     disableChaining
>>>     method, but both of them need to be set by the user.
>>>
>>>     Can the strategy of the Chain be changed in the Custom Sink
>>>     Connector to
>>>     separate Writer and Commiter?
>>>
>>>     Thanks && Regards,
>>>     di.wu
>>>
>>
>

Re: Disable the chain of the Sink operator

Posted by Chesnay Schepler <ch...@apache.org>.
As far as I know that chain between committer and writer is also 
required for correctness.

On 16/02/2023 14:53, weijie guo wrote:
> Hi wu,
>
> I don't think it is a good choice to directly change the strategy of 
> chain. Operator chain usually has better performance and resource 
> utilization. If we directly change the chain policy between them, 
> users can no longer chain them together, which is not a good starting 
> point.
>
> Best regards,
>
> Weijie
>
>
>
> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>
>     Thank you for your reply.
>
>     Currently in the custom Sink Connector, the Flink task will
>     combine Writer and Committer into one thread, and the thread name
>     is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>     In this way, when the *Committer.commit()* method is very slow, it
>     will block the*SinkWriter.write()* method to receive upstream data.
>
>     The client can use the *env.disableOperatorChaining() *method to
>     split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>     *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>     will not block the SinkWriter.write method.
>
>     If the chain policy can be disabled in the custom Sink Connector,
>     the client can be prevented from setting and disabling the chain.
>     Or is there a better way to make*Committer.commit()* not block
>     *SinkWriter.write()*?
>
>     Looking forward for your reply.
>     Thanks && Regards,
>     di.wu
>
>>     2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>
>>     Hi
>>
>>     Do you mean how to disable `chain` in your custom sink
>>     connector?  Can you
>>     give an example of what you want?
>>
>>     On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>
>>>     Hello
>>>
>>>     The current Sink operator will be split into two operations,
>>>     Writer and
>>>     Commiter. By default, they will be chained together and executed
>>>     on the
>>>     same thread.
>>>     So sometimes when the commiter is very slow, it will block the data
>>>     writer, causing back pressure.
>>>
>>>     At present, FlinkSQL can be solved by disabling the chain
>>>     globally, and
>>>     DataStream can partially disable the chain through the
>>>     disableChaining
>>>     method, but both of them need to be set by the user.
>>>
>>>     Can the strategy of the Chain be changed in the Custom Sink
>>>     Connector to
>>>     separate Writer and Commiter?
>>>
>>>     Thanks && Regards,
>>>     di.wu
>>>
>>
>

Re: Disable the chain of the Sink operator

Posted by weijie guo <gu...@gmail.com>.
Hi wu,

I don't think it is a good choice to directly change the strategy of chain.
Operator chain usually has better performance and resource utilization. If
we directly change the chain policy between them, users can no longer chain
them together, which is not a good starting point.

Best regards,

Weijie


wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:

> Thank you for your reply.
>
> Currently in the custom Sink Connector, the Flink task will combine Writer
> and Committer into one thread, and the thread name is similar: *[Sink:
> Writer -> Sink: Committer (1/1)#0]*.
> In this way, when the *Committer.commit()* method is very slow, it will
> block the* SinkWriter.write()* method to receive upstream data.
>
> The client can use the *env.disableOperatorChaining() *method to split
> the thread into two threads:* [Sink: Writer (1/1)#0] *and *[Sink:
> Committer (1/1)#0]*. This Committer. The commit method will not block the
> SinkWriter.write method.
>
> If the chain policy can be disabled in the custom Sink Connector, the
> client can be prevented from setting and disabling the chain. Or is there a
> better way to make* Committer.commit()* not block *SinkWriter.write()*?
>
> Looking forward for your reply.
> Thanks && Regards,
> di.wu
>
> 2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>
> Hi
>
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
>
> On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>
> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>
>
>
>

Re: Disable the chain of the Sink operator

Posted by weijie guo <gu...@gmail.com>.
Hi wu,

I don't think it is a good choice to directly change the strategy of chain.
Operator chain usually has better performance and resource utilization. If
we directly change the chain policy between them, users can no longer chain
them together, which is not a good starting point.

Best regards,

Weijie


wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:

> Thank you for your reply.
>
> Currently in the custom Sink Connector, the Flink task will combine Writer
> and Committer into one thread, and the thread name is similar: *[Sink:
> Writer -> Sink: Committer (1/1)#0]*.
> In this way, when the *Committer.commit()* method is very slow, it will
> block the* SinkWriter.write()* method to receive upstream data.
>
> The client can use the *env.disableOperatorChaining() *method to split
> the thread into two threads:* [Sink: Writer (1/1)#0] *and *[Sink:
> Committer (1/1)#0]*. This Committer. The commit method will not block the
> SinkWriter.write method.
>
> If the chain policy can be disabled in the custom Sink Connector, the
> client can be prevented from setting and disabling the chain. Or is there a
> better way to make* Committer.commit()* not block *SinkWriter.write()*?
>
> Looking forward for your reply.
> Thanks && Regards,
> di.wu
>
> 2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>
> Hi
>
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
>
> On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>
> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>
>
>
>

Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com>.
Thank you for your reply.

Currently in the custom Sink Connector, the Flink task will combine Writer and Committer into one thread, and the thread name is similar: [Sink: Writer -> Sink: Committer (1/1)#0].
In this way, when the Committer.commit() method is very slow, it will block the SinkWriter.write() method to receive upstream data.

The client can use the env.disableOperatorChaining() method to split the thread into two threads: [Sink: Writer (1/1)#0] and [Sink: Committer (1/1)#0]. This Committer. The commit method will not block the SinkWriter.write method.

If the chain policy can be disabled in the custom Sink Connector, the client can be prevented from setting and disabling the chain. Or is there a better way to make Committer.commit() not block SinkWriter.write()?

Looking forward for your reply.
Thanks && Regards,
di.wu

> 2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
> 
> Hi
> 
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
> 
> On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
> 
>> Hello
>> 
>> The current Sink operator will be split into two operations, Writer and
>> Commiter. By default, they will be chained together and executed on the
>> same thread.
>> So sometimes when the commiter is very slow, it will block the data
>> writer, causing back pressure.
>> 
>> At present, FlinkSQL can be solved by disabling the chain globally, and
>> DataStream can partially disable the chain through the disableChaining
>> method, but both of them need to be set by the user.
>> 
>> Can the strategy of the Chain be changed in the Custom Sink Connector to
>> separate Writer and Commiter?
>> 
>> Thanks && Regards,
>> di.wu
>> 
> 


Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com.INVALID>.
Thank you for your reply.

Currently in the custom Sink Connector, the Flink task will combine Writer and Committer into one thread, and the thread name is similar: [Sink: Writer -> Sink: Committer (1/1)#0].
In this way, when the Committer.commit() method is very slow, it will block the SinkWriter.write() method to receive upstream data.

The client can use the env.disableOperatorChaining() method to split the thread into two threads: [Sink: Writer (1/1)#0] and [Sink: Committer (1/1)#0]. This Committer. The commit method will not block the SinkWriter.write method.

If the chain policy can be disabled in the custom Sink Connector, the client can be prevented from setting and disabling the chain. Or is there a better way to make Committer.commit() not block SinkWriter.write()?

Looking forward for your reply.
Thanks && Regards,
di.wu

> 2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
> 
> Hi
> 
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
> 
> On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
> 
>> Hello
>> 
>> The current Sink operator will be split into two operations, Writer and
>> Commiter. By default, they will be chained together and executed on the
>> same thread.
>> So sometimes when the commiter is very slow, it will block the data
>> writer, causing back pressure.
>> 
>> At present, FlinkSQL can be solved by disabling the chain globally, and
>> DataStream can partially disable the chain through the disableChaining
>> method, but both of them need to be set by the user.
>> 
>> Can the strategy of the Chain be changed in the Custom Sink Connector to
>> separate Writer and Commiter?
>> 
>> Thanks && Regards,
>> di.wu
>> 
> 


Re: Disable the chain of the Sink operator

Posted by Shammon FY <zj...@gmail.com>.
Hi

Do you mean how to disable `chain` in your custom sink connector?  Can you
give an example of what you want?

On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:

> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>

Re: Disable the chain of the Sink operator

Posted by Shammon FY <zj...@gmail.com>.
Hi

Do you mean how to disable `chain` in your custom sink connector?  Can you
give an example of what you want?

On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:

> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>

Disable the chain of the Sink operator

Posted by di wu <67...@qq.com>.
Hello


The current Sink operator will be split into two operations, Writer and Commiter. By default, they will be chained together and executed on the same thread.
So sometimes when the commiter is very slow, it will block the data writer, causing back pressure.


At present, FlinkSQL can be solved by disabling the chain globally, and DataStream can partially disable the chain through the disableChaining method, but both of them need to be set by the user.


Can the strategy of the Chain be changed in the Custom Sink Connector to separate Writer and Commiter?



Thanks &amp;&amp; Regards,


di.wu

Re: GlobalCommitter in Flink's two-phase commit

Posted by Jing Ge <ji...@ververica.com>.
Hi,

1. What are the general usage scenarios of GlobalCommitter?
- GlobalCommitter is used for creating and committing an aggregated
committable. It is part of a 2-phase-commit protocol. One use case is the
compaction of small files.

2. Why should GlobalCommitter be removed in the new version of the api?
- As FLIP-191 described, there are many different requirement from
different downstream systems, e.g. Iceberg, Delta lake, Hive. One
GlobalCommitter could not cover all of them. If you take a look at the
SinkV1Adapter source code, you will see that
StandardSinkTopologies#addGlobalCommitter, which is recommended to replace
the usage of GlobalCommitter, is used to take care of the  post commit
topology.

Best regards,
Jing

On Tue, May 24, 2022 at 9:11 AM di wu <67...@qq.com> wrote:

> Hello
> Regarding the GlobalCommitter in Flink's two-phase commit,
> I see it was introduced in FLIP-143, but it seems to have been removed
> again in FLP-191 and marked as Deprecated in the source code.
> I haven't found any relevant information about the use of GlobalCommitter.
>
> There are two questions I would like to ask:
> 1. What are the general usage scenarios of GlobalCommitter?
> 2. Why should GlobalCommitter be removed in the new version of the api?
>
> Thanks && Regards,
> di.wu
>
>

Re: GlobalCommitter in Flink's two-phase commit

Posted by Jing Ge <ji...@ververica.com>.
Hi,

1. What are the general usage scenarios of GlobalCommitter?
- GlobalCommitter is used for creating and committing an aggregated
committable. It is part of a 2-phase-commit protocol. One use case is the
compaction of small files.

2. Why should GlobalCommitter be removed in the new version of the api?
- As FLIP-191 described, there are many different requirement from
different downstream systems, e.g. Iceberg, Delta lake, Hive. One
GlobalCommitter could not cover all of them. If you take a look at the
SinkV1Adapter source code, you will see that
StandardSinkTopologies#addGlobalCommitter, which is recommended to replace
the usage of GlobalCommitter, is used to take care of the  post commit
topology.

Best regards,
Jing

On Tue, May 24, 2022 at 9:11 AM di wu <67...@qq.com> wrote:

> Hello
> Regarding the GlobalCommitter in Flink's two-phase commit,
> I see it was introduced in FLIP-143, but it seems to have been removed
> again in FLP-191 and marked as Deprecated in the source code.
> I haven't found any relevant information about the use of GlobalCommitter.
>
> There are two questions I would like to ask:
> 1. What are the general usage scenarios of GlobalCommitter?
> 2. Why should GlobalCommitter be removed in the new version of the api?
>
> Thanks && Regards,
> di.wu
>
>

Disable the chain of the Sink operator

Posted by di wu <67...@qq.com.INVALID>.
Hello


The current Sink operator will be split into two operations, Writer and Commiter. By default, they will be chained together and executed on the same thread.
So sometimes when the commiter is very slow, it will block the data writer, causing back pressure.


At present, FlinkSQL can be solved by disabling the chain globally, and DataStream can partially disable the chain through the disableChaining method, but both of them need to be set by the user.


Can the strategy of the Chain be changed in the Custom Sink Connector to separate Writer and Commiter?



Thanks &amp;&amp; Regards,


di.wu

GlobalCommitter in Flink's two-phase commit

Posted by di wu <67...@qq.com.INVALID>.
Hello
Regarding the GlobalCommitter in Flink's two-phase commit,&nbsp;
I see it was introduced in FLIP-143, but it seems to have been removed again in FLP-191 and marked as Deprecated in the source code.
I haven't found any relevant information about the use of GlobalCommitter.&nbsp;


There are two questions I would like to ask:
1. What are the general usage scenarios of GlobalCommitter?
2. Why should GlobalCommitter be removed in the new version of the api?


Thanks &amp;&amp; Regards,


di.wu

GlobalCommitter in Flink's two-phase commit

Posted by di wu <67...@qq.com>.
Hello
Regarding the GlobalCommitter in Flink's two-phase commit,&nbsp;
I see it was introduced in FLIP-143, but it seems to have been removed again in FLP-191 and marked as Deprecated in the source code.
I haven't found any relevant information about the use of GlobalCommitter.&nbsp;


There are two questions I would like to ask:
1. What are the general usage scenarios of GlobalCommitter?
2. Why should GlobalCommitter be removed in the new version of the api?


Thanks &amp;&amp; Regards,


di.wu

Re: Does kafka key is supported in kafka sink table

Posted by Shengkai Fang <fs...@gmail.com>.
Hi.

Yes. Flink supports to write the value to the Kafka record key parts. You
just need to specify which column belongs to the key in the WITH blocks,
e.g.

```
CREATE TABLE kafka_sink (
...
) WITH (
   `key.fields` = 'id'
);
```

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#key-fields

Dhavan Vaidya <dh...@kofluence.com> 于2022年5月17日周二 19:16写道:

> Hey wang!
>
> Perhaps this is what you want:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-format
> &
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-fields
> ?
>
> Note that the fields *have* to be one of the "top" level columns of your
> sink table (i.e., fields inside Row are not supported, at least in PyFlink).
>
> Thanks!
>
> On Mon, 16 May 2022 at 19:33, wang <24...@163.com> wrote:
>
>> Hi dear engineer,
>>
>> Flink sql supports kafka sink table, not sure whether it supports kafka
>> key in kafka sink table? As I want to specify kafka key when inserting
>> data into kafka sink table.
>> Thanks for your answer in advance.
>>
>>
>>
>> Thanks && Regards,
>> Hunk
>>
>>
>>
>>
>

Re: Does kafka key is supported in kafka sink table

Posted by Shengkai Fang <fs...@gmail.com>.
Hi.

Yes. Flink supports to write the value to the Kafka record key parts. You
just need to specify which column belongs to the key in the WITH blocks,
e.g.

```
CREATE TABLE kafka_sink (
...
) WITH (
   `key.fields` = 'id'
);
```

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#key-fields

Dhavan Vaidya <dh...@kofluence.com> 于2022年5月17日周二 19:16写道:

> Hey wang!
>
> Perhaps this is what you want:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-format
> &
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-fields
> ?
>
> Note that the fields *have* to be one of the "top" level columns of your
> sink table (i.e., fields inside Row are not supported, at least in PyFlink).
>
> Thanks!
>
> On Mon, 16 May 2022 at 19:33, wang <24...@163.com> wrote:
>
>> Hi dear engineer,
>>
>> Flink sql supports kafka sink table, not sure whether it supports kafka
>> key in kafka sink table? As I want to specify kafka key when inserting
>> data into kafka sink table.
>> Thanks for your answer in advance.
>>
>>
>>
>> Thanks && Regards,
>> Hunk
>>
>>
>>
>>
>

Re: Does kafka key is supported in kafka sink table

Posted by Dhavan Vaidya <dh...@kofluence.com>.
Hey wang!

Perhaps this is what you want:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-format
&
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-fields
?

Note that the fields *have* to be one of the "top" level columns of your
sink table (i.e., fields inside Row are not supported, at least in PyFlink).

Thanks!

On Mon, 16 May 2022 at 19:33, wang <24...@163.com> wrote:

> Hi dear engineer,
>
> Flink sql supports kafka sink table, not sure whether it supports kafka
> key in kafka sink table? As I want to specify kafka key when inserting
> data into kafka sink table.
> Thanks for your answer in advance.
>
>
>
> Thanks && Regards,
> Hunk
>
>
>
>

Re: Does kafka key is supported in kafka sink table

Posted by Dhavan Vaidya <dh...@kofluence.com>.
Hey wang!

Perhaps this is what you want:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-format
&
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-fields
?

Note that the fields *have* to be one of the "top" level columns of your
sink table (i.e., fields inside Row are not supported, at least in PyFlink).

Thanks!

On Mon, 16 May 2022 at 19:33, wang <24...@163.com> wrote:

> Hi dear engineer,
>
> Flink sql supports kafka sink table, not sure whether it supports kafka
> key in kafka sink table? As I want to specify kafka key when inserting
> data into kafka sink table.
> Thanks for your answer in advance.
>
>
>
> Thanks && Regards,
> Hunk
>
>
>
>

Does kafka key is supported in kafka sink table

Posted by wang <24...@163.com>.
Hi dear engineer,


Flink sql supports kafka sink table, not sure whether it supports kafka key in kafka sink table? As I want to specify kafka key when inserting data into kafka sink table.
Thanks for your answer in advance.






Thanks && Regards,
Hunk

Re: Re:Does flink sql support UDTAGG

Posted by Weihua Hu <hu...@gmail.com>.
Hi, wang

Maybe you can take a look at
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function

Best,
Weihua


On Mon, Aug 8, 2022 at 3:52 PM wang <24...@163.com> wrote:

>
>
>
> Hi,
>
>
> Thanks for your response,  I guess what I need should be this one
> (UDTAGG):
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions
> As I want multiple rows as aggregate output. So my question: can we use
> UDTAGG in flink SQL?.  If so, is there some guide of how to use UDTAGG in
> flink SQL?  As there are only flink table api instructions of UDTAGG  in
> the page above.
>
>
>
>
> Thanks,
> Hunk
>
>
>
>
>
>
>
>
> At 2022-08-08 10:56:22, "Xuyang" <xy...@163.com> wrote:
> >Hi, what you want is UDAF? Please check whether this[1] is meet your
> requirement.
> >
> >[1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#aggregate-functions
> >
> >
> >
> >在 2022-08-07 22:06:29,"wang" <24...@163.com> 写道:
> >
> >Hi dear engineers,
> >
> >
> >One small question:  does flink sql support UDTAGG? (user-defined table
> aggregate function), seems only supported in flink table api? If not
> supported in flink sql, how can I define an aggregated udf which could
> output multiple rows to kafka.
> >
> >
> >Thanks for your help!
> >
> >
> >
> >
> >Regards,
> >Hunk
>

Re:Re:Does flink sql support UDTAGG

Posted by wang <24...@163.com>.


Hi,


Thanks for your response,  I guess what I need should be this one (UDTAGG): https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions
As I want multiple rows as aggregate output. So my question: can we use UDTAGG in flink SQL?.  If so, is there some guide of how to use UDTAGG in flink SQL?  As there are only flink table api instructions of UDTAGG  in the page above.




Thanks,
Hunk








At 2022-08-08 10:56:22, "Xuyang" <xy...@163.com> wrote:
>Hi, what you want is UDAF? Please check whether this[1] is meet your requirement.
>
>[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#aggregate-functions
>
>
>
>在 2022-08-07 22:06:29,"wang" <24...@163.com> 写道:
>
>Hi dear engineers,
>
>
>One small question:  does flink sql support UDTAGG? (user-defined table aggregate function), seems only supported in flink table api? If not supported in flink sql, how can I define an aggregated udf which could output multiple rows to kafka.
>
>
>Thanks for your help!
>
>
>
>
>Regards,
>Hunk

Re:Does flink sql support UDTAGG

Posted by Xuyang <xy...@163.com>.
Hi, what you want is UDAF? Please check whether this[1] is meet your requirement.

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#aggregate-functions



在 2022-08-07 22:06:29,"wang" <24...@163.com> 写道:

Hi dear engineers,


One small question:  does flink sql support UDTAGG? (user-defined table aggregate function), seems only supported in flink table api? If not supported in flink sql, how can I define an aggregated udf which could output multiple rows to kafka.


Thanks for your help!




Regards,
Hunk

Re:Does flink sql support UDTAGG

Posted by Xuyang <xy...@163.com>.
Hi, what you want is UDAF? Please check whether this[1] is meet your requirement.

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#aggregate-functions



在 2022-08-07 22:06:29,"wang" <24...@163.com> 写道:

Hi dear engineers,


One small question:  does flink sql support UDTAGG? (user-defined table aggregate function), seems only supported in flink table api? If not supported in flink sql, how can I define an aggregated udf which could output multiple rows to kafka.


Thanks for your help!




Regards,
Hunk

Re: Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

Maybe you can use CURRENT_WATERMARK()[1]  to handle some late data.


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/


Best,
Weihua


On Tue, Feb 21, 2023 at 1:46 PM wang <24...@163.com> wrote:

> Hi dear engineers,
>
>   One question as title: Whether Flink SQL window operations support
> "Allow Lateness and SideOutput"?
>
>   Just as supported in Datastream api (allowedLateness
> and sideOutputLateData) like:
>
>     SingleOutputStreamOperator<> sumStream = dataStream.keyBy()
> .timeWindow()
>                                                                .
> allowedLateness(Time.minutes(1))
>                                                                .
> sideOutputLateData(outputTag)
>                                                                .sum();
>
> Thanks && Regards,
> Hunk
>
>

Re: Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

Maybe you can use CURRENT_WATERMARK()[1]  to handle some late data.


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/


Best,
Weihua


On Tue, Feb 21, 2023 at 1:46 PM wang <24...@163.com> wrote:

> Hi dear engineers,
>
>   One question as title: Whether Flink SQL window operations support
> "Allow Lateness and SideOutput"?
>
>   Just as supported in Datastream api (allowedLateness
> and sideOutputLateData) like:
>
>     SingleOutputStreamOperator<> sumStream = dataStream.keyBy()
> .timeWindow()
>                                                                .
> allowedLateness(Time.minutes(1))
>                                                                .
> sideOutputLateData(outputTag)
>                                                                .sum();
>
> Thanks && Regards,
> Hunk
>
>

Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

Posted by wang <24...@163.com>.
Hi dear engineers,


  One question as title: Whether Flink SQL window operations support "Allow Lateness and SideOutput"?


  Just as supported in Datastream api (allowedLateness and sideOutputLateData) like:


    SingleOutputStreamOperator<>sumStream = dataStream.keyBy().timeWindow()
                                                               .allowedLateness(Time.minutes(1)) 
                                                               .sideOutputLateData(outputTag)
                                                               .sum();


Thanks && Regards,
Hunk


Re: Watermark generating mechanism in Flink SQL

Posted by Matthias Pohl <ma...@aiven.io.INVALID>.
Hi Hunk,
there is documentation about watermarking in FlinkSQL [1]. There is also a
FlinkSQL cookbook entry about watermarking [2]. Essentially, you define the
watermark strategy in your CREATE TABLE statement and specify the lateness
for a given event (not the period in which watermarks are automatically
generated!). You have to apply the `WATERMARK FOR` phrase on a column that
is declared as a time attribute [3]. Watermarks are based on event time,
i.e. based on an event being processed that provides the event time. Your
idea of generating them "every 5 seconds" does not work out of the box
because a watermark wouldn't be generated if the source idles for more than
5 seconds (in case of your specific example). Sending periodic dummy events
extrapolating the current event time would be a way to work around this
issue. Keep in mind that mixing processing time (what you would do if you
create a watermark based on the system's current time rather than relying
on events) and event time is usually not advised. I hope that helps.

Best,
Matthias

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#watermark
[2]
https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/02_watermarks/02_watermarks.md
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/time_attributes/#event-time

On Tue, Oct 18, 2022 at 5:32 AM wang <24...@163.com> wrote:

> Hi dear engineers,
>
> I have one question about watermark generating mechanism in Flink SQL.
> There are two mechanisms called *Periodic Watermarks* and *Punctuated
> Watermarks, *I want to use* Periodic Watermarks* with interval 5 seconds
> (meaning watermarks will be generated every 5 seconds), how should I set in
> Flink sql? thanks in advance!
>
> Regards,
> Hunk
>

Re: Watermark generating mechanism in Flink SQL

Posted by Matthias Pohl via user <us...@flink.apache.org>.
Hi Hunk,
there is documentation about watermarking in FlinkSQL [1]. There is also a
FlinkSQL cookbook entry about watermarking [2]. Essentially, you define the
watermark strategy in your CREATE TABLE statement and specify the lateness
for a given event (not the period in which watermarks are automatically
generated!). You have to apply the `WATERMARK FOR` phrase on a column that
is declared as a time attribute [3]. Watermarks are based on event time,
i.e. based on an event being processed that provides the event time. Your
idea of generating them "every 5 seconds" does not work out of the box
because a watermark wouldn't be generated if the source idles for more than
5 seconds (in case of your specific example). Sending periodic dummy events
extrapolating the current event time would be a way to work around this
issue. Keep in mind that mixing processing time (what you would do if you
create a watermark based on the system's current time rather than relying
on events) and event time is usually not advised. I hope that helps.

Best,
Matthias

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#watermark
[2]
https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/02_watermarks/02_watermarks.md
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/time_attributes/#event-time

On Tue, Oct 18, 2022 at 5:32 AM wang <24...@163.com> wrote:

> Hi dear engineers,
>
> I have one question about watermark generating mechanism in Flink SQL.
> There are two mechanisms called *Periodic Watermarks* and *Punctuated
> Watermarks, *I want to use* Periodic Watermarks* with interval 5 seconds
> (meaning watermarks will be generated every 5 seconds), how should I set in
> Flink sql? thanks in advance!
>
> Regards,
> Hunk
>

Whether Flink SQL window operations support "Allow Lateness and SideOutput"?

Posted by wang <24...@163.com>.
Hi dear engineers,


  One question as title: Whether Flink SQL window operations support "Allow Lateness and SideOutput"?


  Just as supported in Datastream api (allowedLateness and sideOutputLateData) like:


    SingleOutputStreamOperator<>sumStream = dataStream.keyBy().timeWindow()
                                                               .allowedLateness(Time.minutes(1)) 
                                                               .sideOutputLateData(outputTag)
                                                               .sum();


Thanks && Regards,
Hunk


Watermark generating mechanism in Flink SQL

Posted by wang <24...@163.com>.
Hi dear engineers,


I have one question about watermark generating mechanism in Flink SQL.  There are two mechanisms called Periodic Watermarks and Punctuated Watermarks, I want to use Periodic Watermarks with interval 5 seconds (meaning watermarks will be generated every 5 seconds), how should I set in Flink sql? thanks in advance!


Regards,
Hunk

Watermark generating mechanism in Flink SQL

Posted by wang <24...@163.com>.
Hi dear engineers,


I have one question about watermark generating mechanism in Flink SQL.  There are two mechanisms called Periodic Watermarks and Punctuated Watermarks, I want to use Periodic Watermarks with interval 5 seconds (meaning watermarks will be generated every 5 seconds), how should I set in Flink sql? thanks in advance!


Regards,
Hunk

Does flink sql support UDTAGG

Posted by wang <24...@163.com>.
Hi dear engineers,


One small question:  does flink sql support UDTAGG? (user-defined table aggregate function), seems only supported in flink table api? If not supported in flink sql, how can I define an aggregated udf which could output multiple rows to kafka.


Thanks for your help!




Regards,
Hunk

Does kafka key is supported in kafka sink table

Posted by wang <24...@163.com>.
Hi dear engineer,


Flink sql supports kafka sink table, not sure whether it supports kafka key in kafka sink table? As I want to specify kafka key when inserting data into kafka sink table.
Thanks for your answer in advance.






Thanks && Regards,
Hunk

Re: LinkedMap ClassCastException issue

Posted by Shengkai Fang <fs...@gmail.com>.
Hi.

Could you tell us the version of the Flink you are using? What's the
version of commons-collections:commons-collections:jar when you compile the
sql and the version in the cluster? It's possible you compile the sql and
submit with the different version.

I am not sure how you submit your flink sql job. Do you submit your job
with sql client or use jars to execute?

Best,
Shengkai

wang <24...@163.com> 于2022年5月25日周三 15:04写道:

> Hi dear engineers,
>
> Resently I encountered another issue, after I submited a flink sql job, it
> throws an exception:
>
>
> Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> 	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) ~[?:1.8.0_162]
> 	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) ~[?:1.8.0_162]
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:159) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_162]
>
>
>
>
> Then I searched the web, got one answer that can solve this issue:
> config "classloader.resolve-order: parent-first" in flink-conf.yaml
> Indeed, it works for this issue.
>
> But unfortunately, I'm not allowed to change the classloader.resolve-order
> to parent-first, it must be clild-first. As  parent-first will brought me
> other classload related issues.
>
> Then I tried below configuration in flink-conf.yaml:
> classloader.parent-first-patterns.additional:
> org.apache.commons.collections
>
> It can solve that exception, but it's very wired this could cause other
> issues.
>
> So my question is, is there other ways to solve the exception above?
>  Thanks so much for you help!
>
>
> Thanks && Regards,
> Hunk
>
>
>
>
>
>
>

Re: LinkedMap ClassCastException issue

Posted by Shengkai Fang <fs...@gmail.com>.
Hi.

Could you tell us the version of the Flink you are using? What's the
version of commons-collections:commons-collections:jar when you compile the
sql and the version in the cluster? It's possible you compile the sql and
submit with the different version.

I am not sure how you submit your flink sql job. Do you submit your job
with sql client or use jars to execute?

Best,
Shengkai

wang <24...@163.com> 于2022年5月25日周三 15:04写道:

> Hi dear engineers,
>
> Resently I encountered another issue, after I submited a flink sql job, it
> throws an exception:
>
>
> Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> 	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) ~[?:1.8.0_162]
> 	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) ~[?:1.8.0_162]
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:159) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_162]
>
>
>
>
> Then I searched the web, got one answer that can solve this issue:
> config "classloader.resolve-order: parent-first" in flink-conf.yaml
> Indeed, it works for this issue.
>
> But unfortunately, I'm not allowed to change the classloader.resolve-order
> to parent-first, it must be clild-first. As  parent-first will brought me
> other classload related issues.
>
> Then I tried below configuration in flink-conf.yaml:
> classloader.parent-first-patterns.additional:
> org.apache.commons.collections
>
> It can solve that exception, but it's very wired this could cause other
> issues.
>
> So my question is, is there other ways to solve the exception above?
>  Thanks so much for you help!
>
>
> Thanks && Regards,
> Hunk
>
>
>
>
>
>
>

LinkedMap ClassCastException issue

Posted by wang <24...@163.com>.
Hi dear engineers,


Resently I encountered another issue, after I submited a flink sql job, it throws an exception:




Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) ~[?:1.8.0_162]
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) ~[?:1.8.0_162]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:159) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_162]






Then I searched the web, got one answer that can solve this issue:  config "classloader.resolve-order: parent-first" in flink-conf.yaml
Indeed, it works for this issue.


But unfortunately, I'm not allowed to change the classloader.resolve-order to parent-first, it must be clild-first. As  parent-first will brought me other classload related issues.


Then I tried below configuration in flink-conf.yaml:
classloader.parent-first-patterns.additional: org.apache.commons.collections


It can solve that exception, but it's very wired this could cause other issues.


So my question is, is there other ways to solve the exception above?   Thanks so much for you help!




Thanks && Regards,
Hunk




 

Does flink sql support UDTAGG

Posted by wang <24...@163.com>.
Hi dear engineers,


One small question:  does flink sql support UDTAGG? (user-defined table aggregate function), seems only supported in flink table api? If not supported in flink sql, how can I define an aggregated udf which could output multiple rows to kafka.


Thanks for your help!




Regards,
Hunk

Re: How can I set job parameter in flink sql

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Hi, AFAK, you can't get the parameter setted via Flink SQL client in udf. 

If you still want to get the parameters in your udf, you can use the following code to set the parameter: 

env = StreamExecutionEnvironment.getExecutionEnvironment 
parameter = new HashMap<String, String>(); 
parameter .put(" black_list_path ", "xxxx") 
env.getConfig.setGlobalJobParameters(Configuration.fromMap(m)) 

Then, you can get the parameter using context.getJobParameter("black_list_path", "/config/list.properties"); in udf. 

Best regards, 
Yuxia 


发件人: "wang" <24...@163.com> 
收件人: "User" <us...@flink.apache.org>, "user-zh" <us...@flink.apache.org> 
发送时间: 星期三, 2022年 5 月 11日 下午 2:44:20 
主题: How can I set job parameter in flink sql 

Hi dear engineer, 

I want to override the function open() in my UDF, like: 


public class BlackListConvertFunction extends ScalarFunction { 

@Override 
public void open(FunctionContext context) throws Exception { 
String path = context.getJobParameter("black_list_path", "/config/list.properties"); 
System.out.println(path); 
} 

public Double eval(String scores) { 
// some logics 
return 0.0; 
} 
} 





In open() function, I want to fetch the configred value "black_list_path", then simply print that value out. And I config this value in ./sql-client.sh console: 

SET black_list_path = /root/list.properties 

Then I run this UDF, but what printed is /config/list.properties (this is the default value as I set in context.getJobParameter("black_list_path", " /config/list/properties ")) , not /root/list.properties which I set in ./sql-client.sh console. 

So could you please show me the correct way to set black_list_path is sql ? Thanks so much! 


Thanks && Reards, 
Hunk 







LinkedMap ClassCastException issue

Posted by wang <24...@163.com>.
Hi dear engineers,


Resently I encountered another issue, after I submited a flink sql job, it throws an exception:




Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) ~[?:1.8.0_162]
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_162]
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) ~[?:1.8.0_162]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:159) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_162]






Then I searched the web, got one answer that can solve this issue:  config "classloader.resolve-order: parent-first" in flink-conf.yaml
Indeed, it works for this issue.


But unfortunately, I'm not allowed to change the classloader.resolve-order to parent-first, it must be clild-first. As  parent-first will brought me other classload related issues.


Then I tried below configuration in flink-conf.yaml:
classloader.parent-first-patterns.additional: org.apache.commons.collections


It can solve that exception, but it's very wired this could cause other issues.


So my question is, is there other ways to solve the exception above?   Thanks so much for you help!




Thanks && Regards,
Hunk




 

How can I set job parameter in flink sql

Posted by wang <24...@163.com>.
Hi dear engineer,


I want to override the function open() in my UDF, like:




|
public class BlackListConvertFunction extends ScalarFunction {


    @Override
    public void open(FunctionContext context) throws Exception {
        String path = context.getJobParameter("black_list_path", "/config/list.properties");
        System.out.println(path);
    }


    public Double eval(String scores) {
        // some logics
        return 0.0;
    }
}


|






In open() function, I want to fetch the configred value "black_list_path", then simply print that value out. And I config this value in ./sql-client.sh console:


    SET black_list_path = /root/list.properties


Then I run this UDF, but what printed is /config/list.properties(this is the default value as I set in context.getJobParameter("black_list_path", "/config/list/properties")), not /root/list.properties which I set in ./sql-client.sh console.


So could you please show me the correct way to set black_list_path is sql ? Thanks so much!




Thanks && Reards,
Hunk


How can I set job parameter in flink sql

Posted by wang <24...@163.com>.
Hi dear engineer,


I want to override the function open() in my UDF, like:




|
public class BlackListConvertFunction extends ScalarFunction {


    @Override
    public void open(FunctionContext context) throws Exception {
        String path = context.getJobParameter("black_list_path", "/config/list.properties");
        System.out.println(path);
    }


    public Double eval(String scores) {
        // some logics
        return 0.0;
    }
}


|






In open() function, I want to fetch the configred value "black_list_path", then simply print that value out. And I config this value in ./sql-client.sh console:


    SET black_list_path = /root/list.properties


Then I run this UDF, but what printed is /config/list.properties(this is the default value as I set in context.getJobParameter("black_list_path", "/config/list/properties")), not /root/list.properties which I set in ./sql-client.sh console.


So could you please show me the correct way to set black_list_path is sql ? Thanks so much!




Thanks && Reards,
Hunk


Re: How can I set job parameter in flink sql

Posted by Qingsheng Ren <re...@gmail.com>.
Hi,

You can take use of the configuration “pipeline.global-job-parameters” [1] to pass your custom configs all the way into the UDF. For example you can execute this in SQL client:

SET pipeline.global-job-parameters=black_list_path:/root/list.properties;

Then you can get the value “/root/list.properties” by context.getJobParameter(“black_list_path”, “your_default_value”) in the open() method of your UDF.

[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#pipeline-global-job-parameters

Cheers, 

Qingsheng

> On May 11, 2022, at 14:36, wang <24...@163.com> wrote:
> 
> Hi dear engineer,
> 
> I want to override the function open() in my UDF, like:
> 
> <image.png>
> 
> In open() function, I want to fetch the configred value "black_list_path", then simply print that value out. And I config this value in ./sql-client.sh console:
> 
>     SET black_list_path = /root/list.properties
> 
> Then I run this UDF, but what printed is /config/list.properties(this is the default value as I set in context.getJobParameter("black_list_path", "/config/list/properties")), not /root/list.properties which I set in ./sql-client.sh console.
> 
> So could you please show me the correct way to set black_list_path is sql ? Thanks so much!
> 
> 
> Thanks && Reards,
> Hunk
> 
> 
> 
> 
> 
>  


How can I set job parameter in flink sql

Posted by wang <24...@163.com>.
Hi dear engineer,


I want to override the function open() in my UDF, like:




In open() function, I want to fetch the configred value "black_list_path", then simply print that value out. And I config this value in ./sql-client.sh console:


    SET black_list_path = /root/list.properties


Then I run this UDF, but what printed is /config/list.properties(this is the default value as I set in context.getJobParameter("black_list_path", "/config/list/properties")), not /root/list.properties which I set in ./sql-client.sh console.


So could you please show me the correct way to set black_list_path is sql ? Thanks so much!




Thanks && Reards,
Hunk






Re:Re:Re: Could you please give me a hand about json object in flink sql

Posted by lixiongfeng <lx...@163.com>.
May be you can get  some inspiration  from JsonDeserializationSchema an JSONKeyValueDeserializationSchema.













At 2022-04-02 14:47:08, "wang" <24...@163.com> wrote:

Hi,




Thanks so much for your support! 




But sorry to say I'm still confused about it. No matter what the udf looks like, the first thing I need confirm is the type of 'content' in TableSink, what's the type of it should be, should I use type Row, like this?




     CREATE TABLE TableSink (

          `id` STRING NOT NULL,

          `content` ROW<name STRING, age BIGINT>

     )

     WITH (

         ...

    );




This type is only suitable for source input {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}




But the json key name and format of 'content' in source is variable, if the source input is 

{"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




I should define `content` in TableSink with type `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>, like this:




     CREATE TABLE TableSink (

          `id` STRING NOT NULL,

          `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>

     )

     WITH (

         ...

    );



And in input json also might contains json array, like: 
{"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}




So is there some common type I can use which can handle all input json formats?  


Thanks so much!!







Thanks && Regards,

Hunk

















At 2022-04-01 17:25:59, "Qingsheng Ren" <re...@gmail.com> wrote:
>Hi, 
>
>I’m afraid you have to use a UDTF to parse the content and construct the final json string manually. The key problem is that the field “content” is actually a STRING, although it looks like a json object. Currently the json format provided by Flink could not handle this kind of field defined as STRING. Also considering the schema of this “content” field is not fixed across records, Flink SQL can’t use one DDL to consume / produce records with changing schema. 
>
>Cheers,
>
>Qingsheng
>
>> On Mar 31, 2022, at 21:42, wang <24...@163.com> wrote:
>> 
>> Hi dear engineer,
>> 
>> Thanks so much for your precious time reading my email!
>> Resently I'm working on the Flink sql (with version 1.13) in my project and encountered one problem about json format data, hope you can take a look, thanks! Below is the description of my issue.
>> 
>> I use kafka as source and sink, I define kafka source table like this:
>> 
>>      CREATE TABLE TableSource (
>>           schema STRING,
>>           payload ROW(
>>               `id` STRING,
>>               `content` STRING
>>          )
>>      )
>>      WITH (
>>          'connector' = 'kafka',
>>          'topic' = 'topic_source',
>>          'properties.bootstrap.servers' = 'localhost:9092',
>>          'properties.group.id' = 'all_gp',
>>          'scan.startup.mode' = 'group-offsets',
>>          'format' = 'json',
>>          'json.fail-on-missing-field' = 'false',
>>          'json.ignore-parse-errors' = 'true'
>>      );
>> 
>> Define the kafka sink table like this:
>> 
>>      CREATE TABLE TableSink (
>>           `id` STRING NOT NULL,
>>           `content` STRING NOT NULL
>>      )
>>      WITH (
>>          'connector' = 'kafka',
>>          'topic' = 'topic_sink',
>>          'properties.bootstrap.servers' = 'localhost:9092',
>>          'format' = 'json',
>>          'json.fail-on-missing-field' = 'false',
>>          'json.ignore-parse-errors' = 'true'
>>     );
>> 
>> 
>> Then insert into TableSink with data from TableSource:
>>     INSERT INTO TableSink SELECT id, content FROM TableSource;
>> 
>> Then I use "kafka-console-producer.sh" to produce data below into topic "topic_source" (TableSource):
>>     {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> 
>> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the output is:
>>     {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
>> 
>> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
>> I want the the value of "content" is json object, not json string.
>> 
>> And what's more, the format of "content" in TableSource is not fixed, it can be any json formated(or json array format) string, such as:
>> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> 
>> So my question is, how can I transform json format string(like "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object (like{"name":"Jone","age":20} ).
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>>  





 

Re:Re: Could you please give me a hand about json object in flink sql

Posted by wang <24...@163.com>.
Hi,


Got it, seems this way is not flexable enough, but still thanks so much for your great support!  Good wished!




Regards && Thanks
Hunk








At 2022-04-02 16:34:29, "Qingsheng Ren" <re...@gmail.com> wrote:
>Hi,
>
>If the schema of records is not fixed I’m afraid you have to do it in UDTF. 
>
>Best, 
>
>Qingsheng
>
>> On Apr 2, 2022, at 15:45, wang <24...@163.com> wrote:
>> 
>> Hi,
>> 
>> Thanks for your quick response! 
>> 
>> And I tried the format "raw", seems it only support single physical column, and in our project reqiurement, there are more than one hundred columns in sink table. So I need combine those columns into one string in a single UDF?
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> At 2022-04-02 15:17:14, "Qingsheng Ren" <re...@gmail.com> wrote:
>> >Hi,
>> >
>> >You can construct the final json string in your UDTF, and write it to Kafka sink table with only one field, which is the entire json string constructed in UDTF, and use raw format [1] in the sink table:
>> >
>> >CREATE TABLE TableSink (
>> >    `final_json_string` STRING
>> >) WITH (
>> >    ‘connector’ = ‘kafka’,
>> >    ‘format’ = ‘raw’,
>> >    ...
>> >)
>> >
>> >So the entire flow would be like:
>> >
>> >1. Kafka source table reads data
>> >2. UDTF parses the ‘content’ field, and construct the final json (id, content without backslash) string you need, maybe using Jackson [2] or other json tools
>> >3. Insert the constructed json string as the only field in sink table
>> >
>> >The key problem is that the schema of field “content” is not fixed, so you have to complete most logics in UDTF. 
>> >
>> >[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
>> >[2] https://github.com/FasterXML/jackson
>> >
>> >Best regards, 
>> >
>> >Qingsheng
>> >
>> >
>> >> On Apr 2, 2022, at 14:47, wang <24...@163.com> wrote:
>> >> 
>> >> Hi,
>> >> 
>> >> Thanks so much for your support! 
>> >> 
>> >> But sorry to say I'm still confused about it. No matter what the udf looks like, the first thing I need confirm is the type of 'content' in TableSink, what's the type of it should be, should I use type Row, like this?
>> >> 
>> >>      CREATE TABLE TableSink (
>> >>           `id` STRING NOT NULL,
>> >>           `content` ROW<name STRING, age BIGINT>
>> >>      )
>> >>      WITH (
>> >>          ...
>> >>     );
>> >> 
>> >> This type is only suitable for source input {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> >> 
>> >> But the json key name and format of 'content' in source is variable, if the source input is 
>> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> >> 
>> >> I should define `content` in TableSink with type `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>, like this:
>> >> 
>> >>      CREATE TABLE TableSink (
>> >>           `id` STRING NOT NULL,
>> >>           `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>
>> >>      )
>> >>      WITH (
>> >>          ...
>> >>     );
>> >> 
>> >> And in input json also might contains json array, like: 
>> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
>> >> 
>> >> 
>> >> So is there some common type I can use which can handle all input json formats?  
>> >> 
>> >> Thanks so much!!
>> >> 
>> >> 
>> >> Thanks && Regards,
>> >> Hunk
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> At 2022-04-01 17:25:59, "Qingsheng Ren" <renqschn@gmail.com
>> >> > wrote:
>> >> >Hi, 
>> >> >
>> >> >I’m afraid you have to use a UDTF to parse the content and construct the final json string manually. The key problem is that the field “content” is actually a STRING, although it looks like a json object. Currently the json format provided by Flink could not handle this kind of field defined as STRING. Also considering the schema of this “content” field is not fixed across records, Flink SQL can’t use one DDL to consume / produce records with changing schema. 
>> >> >
>> >> >Cheers,
>> >> >
>> >> >Qingsheng
>> >> >
>> >> >> On Mar 31, 2022, at 21:42, wang <
>> >> 24248792@163.com
>> >> > wrote:
>> >> >> 
>> >> >> Hi dear engineer,
>> >> >> 
>> >> >> Thanks so much for your precious time reading my email!
>> >> >> Resently I'm working on the Flink sql (with version 1.13) in my project and encountered one problem about json format data, hope you can take a look, thanks! Below is the description of my issue.
>> >> >> 
>> >> >> I use kafka as source and sink, I define kafka source table like this:
>> >> >> 
>> >> >>      CREATE TABLE TableSource (
>> >> >>           schema STRING,
>> >> >>           payload ROW(
>> >> >>               `id` STRING,
>> >> >>               `content` STRING
>> >> >>          )
>> >> >>      )
>> >> >>      WITH (
>> >> >>          'connector' = 'kafka',
>> >> >>          'topic' = 'topic_source',
>> >> >>          'properties.bootstrap.servers' = 'localhost:9092',
>> >> >>          '
>> >> properties.group.id
>> >> ' = 'all_gp',
>> >> >>          'scan.startup.mode' = 'group-offsets',
>> >> >>          'format' = 'json',
>> >> >>          'json.fail-on-missing-field' = 'false',
>> >> >>          'json.ignore-parse-errors' = 'true'
>> >> >>      );
>> >> >> 
>> >> >> Define the kafka sink table like this:
>> >> >> 
>> >> >>      CREATE TABLE TableSink (
>> >> >>           `id` STRING NOT NULL,
>> >> >>           `content` STRING NOT NULL
>> >> >>      )
>> >> >>      WITH (
>> >> >>          'connector' = 'kafka',
>> >> >>          'topic' = 'topic_sink',
>> >> >>          'properties.bootstrap.servers' = 'localhost:9092',
>> >> >>          'format' = 'json',
>> >> >>          'json.fail-on-missing-field' = 'false',
>> >> >>          'json.ignore-parse-errors' = 'true'
>> >> >>     );
>> >> >> 
>> >> >> 
>> >> >> Then insert into TableSink with data from TableSource:
>> >> >>     INSERT INTO TableSink SELECT id, content FROM TableSource;
>> >> >> 
>> >> >> Then I use "kafka-console-producer.sh" to produce data below into topic "topic_source" (TableSource):
>> >> >>     {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> >> >> 
>> >> >> 
>> >> >> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the output is:
>> >> >>     {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
>> >> >> 
>> >> >> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
>> >> >> I want the the value of "content" is json object, not json string.
>> >> >> 
>> >> >> And what's more, the format of "content" in TableSource is not fixed, it can be any json formated(or json array format) string, such as:
>> >> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> >> >> 
>> >> >> 
>> >> >> So my question is, how can I transform json format string(like "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object (like{"name":"Jone","age":20} ).
>> >> >> 
>> >> >> 
>> >> >> Thanks && Regards,
>> >> >> Hunk
>> >> >> 
>> >> >> 
>> >> >> 
>> >> >> 
>> >> >>  
>> >> 
>> 
>> 
>> 
>>  

Re: Could you please give me a hand about json object in flink sql

Posted by Qingsheng Ren <re...@gmail.com>.
Hi,

If the schema of records is not fixed I’m afraid you have to do it in UDTF. 

Best, 

Qingsheng

> On Apr 2, 2022, at 15:45, wang <24...@163.com> wrote:
> 
> Hi,
> 
> Thanks for your quick response! 
> 
> And I tried the format "raw", seems it only support single physical column, and in our project reqiurement, there are more than one hundred columns in sink table. So I need combine those columns into one string in a single UDF?
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> 
> At 2022-04-02 15:17:14, "Qingsheng Ren" <re...@gmail.com> wrote:
> >Hi,
> >
> >You can construct the final json string in your UDTF, and write it to Kafka sink table with only one field, which is the entire json string constructed in UDTF, and use raw format [1] in the sink table:
> >
> >CREATE TABLE TableSink (
> >    `final_json_string` STRING
> >) WITH (
> >    ‘connector’ = ‘kafka’,
> >    ‘format’ = ‘raw’,
> >    ...
> >)
> >
> >So the entire flow would be like:
> >
> >1. Kafka source table reads data
> >2. UDTF parses the ‘content’ field, and construct the final json (id, content without backslash) string you need, maybe using Jackson [2] or other json tools
> >3. Insert the constructed json string as the only field in sink table
> >
> >The key problem is that the schema of field “content” is not fixed, so you have to complete most logics in UDTF. 
> >
> >[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
> >[2] https://github.com/FasterXML/jackson
> >
> >Best regards, 
> >
> >Qingsheng
> >
> >
> >> On Apr 2, 2022, at 14:47, wang <24...@163.com> wrote:
> >> 
> >> Hi,
> >> 
> >> Thanks so much for your support! 
> >> 
> >> But sorry to say I'm still confused about it. No matter what the udf looks like, the first thing I need confirm is the type of 'content' in TableSink, what's the type of it should be, should I use type Row, like this?
> >> 
> >>      CREATE TABLE TableSink (
> >>           `id` STRING NOT NULL,
> >>           `content` ROW<name STRING, age BIGINT>
> >>      )
> >>      WITH (
> >>          ...
> >>     );
> >> 
> >> This type is only suitable for source input {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> >> 
> >> But the json key name and format of 'content' in source is variable, if the source input is 
> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> >> 
> >> I should define `content` in TableSink with type `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>, like this:
> >> 
> >>      CREATE TABLE TableSink (
> >>           `id` STRING NOT NULL,
> >>           `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>
> >>      )
> >>      WITH (
> >>          ...
> >>     );
> >> 
> >> And in input json also might contains json array, like: 
> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> >> 
> >> 
> >> So is there some common type I can use which can handle all input json formats?  
> >> 
> >> Thanks so much!!
> >> 
> >> 
> >> Thanks && Regards,
> >> Hunk
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> At 2022-04-01 17:25:59, "Qingsheng Ren" <renqschn@gmail.com
> >> > wrote:
> >> >Hi, 
> >> >
> >> >I’m afraid you have to use a UDTF to parse the content and construct the final json string manually. The key problem is that the field “content” is actually a STRING, although it looks like a json object. Currently the json format provided by Flink could not handle this kind of field defined as STRING. Also considering the schema of this “content” field is not fixed across records, Flink SQL can’t use one DDL to consume / produce records with changing schema. 
> >> >
> >> >Cheers,
> >> >
> >> >Qingsheng
> >> >
> >> >> On Mar 31, 2022, at 21:42, wang <
> >> 24248792@163.com
> >> > wrote:
> >> >> 
> >> >> Hi dear engineer,
> >> >> 
> >> >> Thanks so much for your precious time reading my email!
> >> >> Resently I'm working on the Flink sql (with version 1.13) in my project and encountered one problem about json format data, hope you can take a look, thanks! Below is the description of my issue.
> >> >> 
> >> >> I use kafka as source and sink, I define kafka source table like this:
> >> >> 
> >> >>      CREATE TABLE TableSource (
> >> >>           schema STRING,
> >> >>           payload ROW(
> >> >>               `id` STRING,
> >> >>               `content` STRING
> >> >>          )
> >> >>      )
> >> >>      WITH (
> >> >>          'connector' = 'kafka',
> >> >>          'topic' = 'topic_source',
> >> >>          'properties.bootstrap.servers' = 'localhost:9092',
> >> >>          '
> >> properties.group.id
> >> ' = 'all_gp',
> >> >>          'scan.startup.mode' = 'group-offsets',
> >> >>          'format' = 'json',
> >> >>          'json.fail-on-missing-field' = 'false',
> >> >>          'json.ignore-parse-errors' = 'true'
> >> >>      );
> >> >> 
> >> >> Define the kafka sink table like this:
> >> >> 
> >> >>      CREATE TABLE TableSink (
> >> >>           `id` STRING NOT NULL,
> >> >>           `content` STRING NOT NULL
> >> >>      )
> >> >>      WITH (
> >> >>          'connector' = 'kafka',
> >> >>          'topic' = 'topic_sink',
> >> >>          'properties.bootstrap.servers' = 'localhost:9092',
> >> >>          'format' = 'json',
> >> >>          'json.fail-on-missing-field' = 'false',
> >> >>          'json.ignore-parse-errors' = 'true'
> >> >>     );
> >> >> 
> >> >> 
> >> >> Then insert into TableSink with data from TableSource:
> >> >>     INSERT INTO TableSink SELECT id, content FROM TableSource;
> >> >> 
> >> >> Then I use "kafka-console-producer.sh" to produce data below into topic "topic_source" (TableSource):
> >> >>     {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> >> >> 
> >> >> 
> >> >> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the output is:
> >> >>     {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
> >> >> 
> >> >> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
> >> >> I want the the value of "content" is json object, not json string.
> >> >> 
> >> >> And what's more, the format of "content" in TableSource is not fixed, it can be any json formated(or json array format) string, such as:
> >> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> >> >> 
> >> >> 
> >> >> So my question is, how can I transform json format string(like "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object (like{"name":"Jone","age":20} ).
> >> >> 
> >> >> 
> >> >> Thanks && Regards,
> >> >> Hunk
> >> >> 
> >> >> 
> >> >> 
> >> >> 
> >> >>  
> >> 
> 
> 
> 
>  


Re:Re: Could you please give me a hand about json object in flink sql

Posted by wang <24...@163.com>.
Hi,


Thanks for your quick response! 


And I tried the format "raw", seems it only support single physical column, and in our project reqiurement, there are more than one hundred columns in sink table. So I need combine those columns into one string in a single UDF?


Thanks && Regards,
Hunk

















At 2022-04-02 15:17:14, "Qingsheng Ren" <re...@gmail.com> wrote:
>Hi,
>
>You can construct the final json string in your UDTF, and write it to Kafka sink table with only one field, which is the entire json string constructed in UDTF, and use raw format [1] in the sink table:
>
>CREATE TABLE TableSink (
>    `final_json_string` STRING
>) WITH (
>    ‘connector’ = ‘kafka’,
>    ‘format’ = ‘raw’,
>    ...
>)
>
>So the entire flow would be like:
>
>1. Kafka source table reads data
>2. UDTF parses the ‘content’ field, and construct the final json (id, content without backslash) string you need, maybe using Jackson [2] or other json tools
>3. Insert the constructed json string as the only field in sink table
>
>The key problem is that the schema of field “content” is not fixed, so you have to complete most logics in UDTF. 
>
>[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
>[2] https://github.com/FasterXML/jackson
>
>Best regards, 
>
>Qingsheng
>
>
>> On Apr 2, 2022, at 14:47, wang <24...@163.com> wrote:
>> 
>> Hi,
>> 
>> Thanks so much for your support! 
>> 
>> But sorry to say I'm still confused about it. No matter what the udf looks like, the first thing I need confirm is the type of 'content' in TableSink, what's the type of it should be, should I use type Row, like this?
>> 
>>      CREATE TABLE TableSink (
>>           `id` STRING NOT NULL,
>>           `content` ROW<name STRING, age BIGINT>
>>      )
>>      WITH (
>>          ...
>>     );
>> 
>> This type is only suitable for source input {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> But the json key name and format of 'content' in source is variable, if the source input is 
>> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> I should define `content` in TableSink with type `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>, like this:
>> 
>>      CREATE TABLE TableSink (
>>           `id` STRING NOT NULL,
>>           `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>
>>      )
>>      WITH (
>>          ...
>>     );
>> 
>> And in input json also might contains json array, like: 
>> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
>> 
>> 
>> So is there some common type I can use which can handle all input json formats?  
>> 
>> Thanks so much!!
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>> 
>> 
>> At 2022-04-01 17:25:59, "Qingsheng Ren" <renqschn@gmail.com
>> > wrote:
>> >Hi, 
>> >
>> >I’m afraid you have to use a UDTF to parse the content and construct the final json string manually. The key problem is that the field “content” is actually a STRING, although it looks like a json object. Currently the json format provided by Flink could not handle this kind of field defined as STRING. Also considering the schema of this “content” field is not fixed across records, Flink SQL can’t use one DDL to consume / produce records with changing schema. 
>> >
>> >Cheers,
>> >
>> >Qingsheng
>> >
>> >> On Mar 31, 2022, at 21:42, wang <
>> 24248792@163.com
>> > wrote:
>> >> 
>> >> Hi dear engineer,
>> >> 
>> >> Thanks so much for your precious time reading my email!
>> >> Resently I'm working on the Flink sql (with version 1.13) in my project and encountered one problem about json format data, hope you can take a look, thanks! Below is the description of my issue.
>> >> 
>> >> I use kafka as source and sink, I define kafka source table like this:
>> >> 
>> >>      CREATE TABLE TableSource (
>> >>           schema STRING,
>> >>           payload ROW(
>> >>               `id` STRING,
>> >>               `content` STRING
>> >>          )
>> >>      )
>> >>      WITH (
>> >>          'connector' = 'kafka',
>> >>          'topic' = 'topic_source',
>> >>          'properties.bootstrap.servers' = 'localhost:9092',
>> >>          '
>> properties.group.id
>> ' = 'all_gp',
>> >>          'scan.startup.mode' = 'group-offsets',
>> >>          'format' = 'json',
>> >>          'json.fail-on-missing-field' = 'false',
>> >>          'json.ignore-parse-errors' = 'true'
>> >>      );
>> >> 
>> >> Define the kafka sink table like this:
>> >> 
>> >>      CREATE TABLE TableSink (
>> >>           `id` STRING NOT NULL,
>> >>           `content` STRING NOT NULL
>> >>      )
>> >>      WITH (
>> >>          'connector' = 'kafka',
>> >>          'topic' = 'topic_sink',
>> >>          'properties.bootstrap.servers' = 'localhost:9092',
>> >>          'format' = 'json',
>> >>          'json.fail-on-missing-field' = 'false',
>> >>          'json.ignore-parse-errors' = 'true'
>> >>     );
>> >> 
>> >> 
>> >> Then insert into TableSink with data from TableSource:
>> >>     INSERT INTO TableSink SELECT id, content FROM TableSource;
>> >> 
>> >> Then I use "kafka-console-producer.sh" to produce data below into topic "topic_source" (TableSource):
>> >>     {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> >> 
>> >> 
>> >> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the output is:
>> >>     {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
>> >> 
>> >> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
>> >> I want the the value of "content" is json object, not json string.
>> >> 
>> >> And what's more, the format of "content" in TableSource is not fixed, it can be any json formated(or json array format) string, such as:
>> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> >> 
>> >> 
>> >> So my question is, how can I transform json format string(like "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object (like{"name":"Jone","age":20} ).
>> >> 
>> >> 
>> >> Thanks && Regards,
>> >> Hunk
>> >> 
>> >> 
>> >> 
>> >> 
>> >>  
>> 

Re: Could you please give me a hand about json object in flink sql

Posted by Qingsheng Ren <re...@gmail.com>.
Hi,

You can construct the final json string in your UDTF, and write it to Kafka sink table with only one field, which is the entire json string constructed in UDTF, and use raw format [1] in the sink table:

CREATE TABLE TableSink (
    `final_json_string` STRING
) WITH (
    ‘connector’ = ‘kafka’,
    ‘format’ = ‘raw’,
    ...
)

So the entire flow would be like:

1. Kafka source table reads data
2. UDTF parses the ‘content’ field, and construct the final json (id, content without backslash) string you need, maybe using Jackson [2] or other json tools
3. Insert the constructed json string as the only field in sink table

The key problem is that the schema of field “content” is not fixed, so you have to complete most logics in UDTF. 

[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
[2] https://github.com/FasterXML/jackson

Best regards, 

Qingsheng


> On Apr 2, 2022, at 14:47, wang <24...@163.com> wrote:
> 
> Hi,
> 
> Thanks so much for your support! 
> 
> But sorry to say I'm still confused about it. No matter what the udf looks like, the first thing I need confirm is the type of 'content' in TableSink, what's the type of it should be, should I use type Row, like this?
> 
>      CREATE TABLE TableSink (
>           `id` STRING NOT NULL,
>           `content` ROW<name STRING, age BIGINT>
>      )
>      WITH (
>          ...
>     );
> 
> This type is only suitable for source input {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> 
> But the json key name and format of 'content' in source is variable, if the source input is 
> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> I should define `content` in TableSink with type `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>, like this:
> 
>      CREATE TABLE TableSink (
>           `id` STRING NOT NULL,
>           `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>
>      )
>      WITH (
>          ...
>     );
> 
> And in input json also might contains json array, like: 
> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> 
> 
> So is there some common type I can use which can handle all input json formats?  
> 
> Thanks so much!!
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> At 2022-04-01 17:25:59, "Qingsheng Ren" <renqschn@gmail.com
> > wrote:
> >Hi, 
> >
> >I’m afraid you have to use a UDTF to parse the content and construct the final json string manually. The key problem is that the field “content” is actually a STRING, although it looks like a json object. Currently the json format provided by Flink could not handle this kind of field defined as STRING. Also considering the schema of this “content” field is not fixed across records, Flink SQL can’t use one DDL to consume / produce records with changing schema. 
> >
> >Cheers,
> >
> >Qingsheng
> >
> >> On Mar 31, 2022, at 21:42, wang <
> 24248792@163.com
> > wrote:
> >> 
> >> Hi dear engineer,
> >> 
> >> Thanks so much for your precious time reading my email!
> >> Resently I'm working on the Flink sql (with version 1.13) in my project and encountered one problem about json format data, hope you can take a look, thanks! Below is the description of my issue.
> >> 
> >> I use kafka as source and sink, I define kafka source table like this:
> >> 
> >>      CREATE TABLE TableSource (
> >>           schema STRING,
> >>           payload ROW(
> >>               `id` STRING,
> >>               `content` STRING
> >>          )
> >>      )
> >>      WITH (
> >>          'connector' = 'kafka',
> >>          'topic' = 'topic_source',
> >>          'properties.bootstrap.servers' = 'localhost:9092',
> >>          '
> properties.group.id
> ' = 'all_gp',
> >>          'scan.startup.mode' = 'group-offsets',
> >>          'format' = 'json',
> >>          'json.fail-on-missing-field' = 'false',
> >>          'json.ignore-parse-errors' = 'true'
> >>      );
> >> 
> >> Define the kafka sink table like this:
> >> 
> >>      CREATE TABLE TableSink (
> >>           `id` STRING NOT NULL,
> >>           `content` STRING NOT NULL
> >>      )
> >>      WITH (
> >>          'connector' = 'kafka',
> >>          'topic' = 'topic_sink',
> >>          'properties.bootstrap.servers' = 'localhost:9092',
> >>          'format' = 'json',
> >>          'json.fail-on-missing-field' = 'false',
> >>          'json.ignore-parse-errors' = 'true'
> >>     );
> >> 
> >> 
> >> Then insert into TableSink with data from TableSource:
> >>     INSERT INTO TableSink SELECT id, content FROM TableSource;
> >> 
> >> Then I use "kafka-console-producer.sh" to produce data below into topic "topic_source" (TableSource):
> >>     {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> >> 
> >> 
> >> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the output is:
> >>     {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
> >> 
> >> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
> >> I want the the value of "content" is json object, not json string.
> >> 
> >> And what's more, the format of "content" in TableSource is not fixed, it can be any json formated(or json array format) string, such as:
> >> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> >> 
> >> 
> >> So my question is, how can I transform json format string(like "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object (like{"name":"Jone","age":20} ).
> >> 
> >> 
> >> Thanks && Regards,
> >> Hunk
> >> 
> >> 
> >> 
> >> 
> >>  
> 


Re:Re: Could you please give me a hand about json object in flink sql

Posted by wang <24...@163.com>.
Hi,




Thanks so much for your support! 




But sorry to say I'm still confused about it. No matter what the udf looks like, the first thing I need confirm is the type of 'content' in TableSink, what's the type of it should be, should I use type Row, like this?




     CREATE TABLE TableSink (

          `id` STRING NOT NULL,

          `content` ROW<name STRING, age BIGINT>

     )

     WITH (

         ...

    );




This type is only suitable for source input {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}




But the json key name and format of 'content' in source is variable, if the source input is 

{"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




I should define `content` in TableSink with type `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>, like this:




     CREATE TABLE TableSink (

          `id` STRING NOT NULL,

          `content` ROW<color STRING, BackgroundColor STRING, Height BIGINT>

     )

     WITH (

         ...

    );



And in input json also might contains json array, like: 
{"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}




So is there some common type I can use which can handle all input json formats?  


Thanks so much!!







Thanks && Regards,

Hunk

















At 2022-04-01 17:25:59, "Qingsheng Ren" <re...@gmail.com> wrote:
>Hi, 
>
>I’m afraid you have to use a UDTF to parse the content and construct the final json string manually. The key problem is that the field “content” is actually a STRING, although it looks like a json object. Currently the json format provided by Flink could not handle this kind of field defined as STRING. Also considering the schema of this “content” field is not fixed across records, Flink SQL can’t use one DDL to consume / produce records with changing schema. 
>
>Cheers,
>
>Qingsheng
>
>> On Mar 31, 2022, at 21:42, wang <24...@163.com> wrote:
>> 
>> Hi dear engineer,
>> 
>> Thanks so much for your precious time reading my email!
>> Resently I'm working on the Flink sql (with version 1.13) in my project and encountered one problem about json format data, hope you can take a look, thanks! Below is the description of my issue.
>> 
>> I use kafka as source and sink, I define kafka source table like this:
>> 
>>      CREATE TABLE TableSource (
>>           schema STRING,
>>           payload ROW(
>>               `id` STRING,
>>               `content` STRING
>>          )
>>      )
>>      WITH (
>>          'connector' = 'kafka',
>>          'topic' = 'topic_source',
>>          'properties.bootstrap.servers' = 'localhost:9092',
>>          'properties.group.id' = 'all_gp',
>>          'scan.startup.mode' = 'group-offsets',
>>          'format' = 'json',
>>          'json.fail-on-missing-field' = 'false',
>>          'json.ignore-parse-errors' = 'true'
>>      );
>> 
>> Define the kafka sink table like this:
>> 
>>      CREATE TABLE TableSink (
>>           `id` STRING NOT NULL,
>>           `content` STRING NOT NULL
>>      )
>>      WITH (
>>          'connector' = 'kafka',
>>          'topic' = 'topic_sink',
>>          'properties.bootstrap.servers' = 'localhost:9092',
>>          'format' = 'json',
>>          'json.fail-on-missing-field' = 'false',
>>          'json.ignore-parse-errors' = 'true'
>>     );
>> 
>> 
>> Then insert into TableSink with data from TableSource:
>>     INSERT INTO TableSink SELECT id, content FROM TableSource;
>> 
>> Then I use "kafka-console-producer.sh" to produce data below into topic "topic_source" (TableSource):
>>     {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> 
>> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the output is:
>>     {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
>> 
>> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
>> I want the the value of "content" is json object, not json string.
>> 
>> And what's more, the format of "content" in TableSource is not fixed, it can be any json formated(or json array format) string, such as:
>> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> 
>> So my question is, how can I transform json format string(like "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object (like{"name":"Jone","age":20} ).
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>>  

Re: Could you please give me a hand about json object in flink sql

Posted by Qingsheng Ren <re...@gmail.com>.
Hi, 

I’m afraid you have to use a UDTF to parse the content and construct the final json string manually. The key problem is that the field “content” is actually a STRING, although it looks like a json object. Currently the json format provided by Flink could not handle this kind of field defined as STRING. Also considering the schema of this “content” field is not fixed across records, Flink SQL can’t use one DDL to consume / produce records with changing schema. 

Cheers,

Qingsheng

> On Mar 31, 2022, at 21:42, wang <24...@163.com> wrote:
> 
> Hi dear engineer,
> 
> Thanks so much for your precious time reading my email!
> Resently I'm working on the Flink sql (with version 1.13) in my project and encountered one problem about json format data, hope you can take a look, thanks! Below is the description of my issue.
> 
> I use kafka as source and sink, I define kafka source table like this:
> 
>      CREATE TABLE TableSource (
>           schema STRING,
>           payload ROW(
>               `id` STRING,
>               `content` STRING
>          )
>      )
>      WITH (
>          'connector' = 'kafka',
>          'topic' = 'topic_source',
>          'properties.bootstrap.servers' = 'localhost:9092',
>          'properties.group.id' = 'all_gp',
>          'scan.startup.mode' = 'group-offsets',
>          'format' = 'json',
>          'json.fail-on-missing-field' = 'false',
>          'json.ignore-parse-errors' = 'true'
>      );
> 
> Define the kafka sink table like this:
> 
>      CREATE TABLE TableSink (
>           `id` STRING NOT NULL,
>           `content` STRING NOT NULL
>      )
>      WITH (
>          'connector' = 'kafka',
>          'topic' = 'topic_sink',
>          'properties.bootstrap.servers' = 'localhost:9092',
>          'format' = 'json',
>          'json.fail-on-missing-field' = 'false',
>          'json.ignore-parse-errors' = 'true'
>     );
> 
> 
> Then insert into TableSink with data from TableSource:
>     INSERT INTO TableSink SELECT id, content FROM TableSource;
> 
> Then I use "kafka-console-producer.sh" to produce data below into topic "topic_source" (TableSource):
>     {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> 
> 
> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the output is:
>     {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
> 
> But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
> I want the the value of "content" is json object, not json string.
> 
> And what's more, the format of "content" in TableSource is not fixed, it can be any json formated(or json array format) string, such as:
> {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> 
> So my question is, how can I transform json format string(like "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object (like{"name":"Jone","age":20} ).
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
>  


How can I set job parameter in flink sql

Posted by wang <24...@163.com>.
Hi dear engineer,


I want to override the function open() in my UDF, like:




In open() function, I want to fetch the configred value "black_list_path", then simply print that value out. And I config this value in ./sql-client.sh console:


    SET black_list_path = /root/list.properties


Then I run this UDF, but what printed is /config/list.properties(this is the default value as I set in context.getJobParameter("black_list_path", "/config/list/properties")), not /root/list.properties which I set in ./sql-client.sh console.


So could you please show me the correct way to set black_list_path is sql ? Thanks so much!




Thanks && Reards,
Hunk






Re:Could you please give me a hand about json object in flink sql

Posted by Mang Zhang <zh...@163.com>.
because  the format of "content" in TableSource is not fixed,


It may be a good way to deal with it through UDF













--

Best regards,
Mang Zhang





At 2022-03-31 21:42:50, "wang" <24...@163.com> wrote:
>Hi dear engineer,
>
>
>Thanks so much for your precious time reading my email!
>Resently I'm working on the Flink sql (with version 1.13) in my project and encountered one problem about json format data, hope you can take a look, thanks! Below is the description of my issue.
>
>
>I use kafka as source and sink, I define kafka source table like this:
>
>
>     CREATE TABLE TableSource (
>          schema STRING,
>          payload ROW(
>              `id` STRING,
>              `content` STRING
>         )
>     )
>     WITH (
>         'connector' = 'kafka',
>         'topic' = 'topic_source',
>         'properties.bootstrap.servers' = 'localhost:9092',
>         'properties.group.id' = 'all_gp',
>         'scan.startup.mode' = 'group-offsets',
>         'format' = 'json',
>         'json.fail-on-missing-field' = 'false',
>         'json.ignore-parse-errors' = 'true'
>     );
>
>
>Define the kafka sink table like this:
>
>
>     CREATE TABLE TableSink (
>          `id` STRING NOT NULL,
>          `content` STRING NOT NULL
>     )
>     WITH (
>         'connector' = 'kafka',
>         'topic' = 'topic_sink',
>         'properties.bootstrap.servers' = 'localhost:9092',
>         'format' = 'json',
>         'json.fail-on-missing-field' = 'false',
>         'json.ignore-parse-errors' = 'true'
>    );
>
>
>
>
>Then insert into TableSink with data from TableSource:
>    INSERT INTO TableSink SELECT id, content FROM TableSource;
>
>
>Then I use "kafka-console-producer.sh" to produce data below into topic "topic_source" (TableSource):
>    {"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>
>
>
>
>Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the output is:
>    {"id":"10000","content":"{\"name\":\"Jone\",\"age\":20}"}
>
>
>But what I want here is {"id":"10000","content": {"name":"Jone","age":20}}
>I want the the value of "content" is json object, not json string.
>
>
>And what's more, the format of "content" in TableSource is not fixed, it can be any json formated(or json array format) string, such as:
>{"schema": "schema_infos", "payload": {"id": "10000", "content": "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>
>
>
>
>So my question is, how can I transform json format string(like "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object (like{"name":"Jone","age":20} ).
>
>
>
>
>Thanks && Regards,
>Hunk
>