You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "zstraw@163.com" <zs...@163.com> on 2022/03/07 08:00:10 UTC

[DISCUSS] The abstraction of cache lookupFunction and cache metric

Hi devs,


I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. 
At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot.


I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.


Looking forward to your feedback, thanks.


Best regards,
Yuan




[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
[2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
[3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
[4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java

Re:Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

Posted by "zstraw@163.com" <zs...@163.com>.
Hi Qingsheng,



> If I understand correctly these are specified in DDL table options by users.




It's inconvenient for user to checkout the options when they in front of a running task. And they don't know the real underlying options in effect if there are some bugs or other incorrect configurations lead to invalid.




> I don’t think there's a rule that all metric names should be in MetricNames class, but it would be great to aggregate these constants into a unified place. 


It's a good choice to aggregate the constants together.


Best regards,
Yuan


At 2022-03-08 09:57:30, "Qingsheng Ren" <re...@gmail.com> wrote: >Hi Yuan, > >> how can we tell the real “identifier” and “type” options in effect to users? > >If I understand correctly these are specified in DDL table options by users. For example: > >CREATE TABLE DimTable (…) WITH ( > ... > “cache.identifier” = “guava”, > “cache.type” = “LRU” >); > >> Does MetricNames.java contain all metric names? > > >I don’t think there's a rule that all metric names should be in MetricNames class, but it would be great to aggregate these constants into a unified place. > >Cheers, > >Qingsheng > > >> On Mar 8, 2022, at 10:22, zstraw@163.com wrote: >> >> Hi Qingsheng Ren, >> Thanks for your feedback. >> >> >>> 1. It looks like “identifier” and “type” are options of cache instead of metrics. I think they are finalized once the cache is created so maybe it’s not quite helpful to report them to the metric system. >> >> >> Maybe it's not quite appropriate to report them to the metric system, but how can we tell the real “identifier” and “type” options in effect to users? >> >> >> >> >>> 2. Some metrics can be aggregated simply in metric systems, like loadCount = loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy. >> >> >> I agree with you. I have removed these redundant metrics. >> >> >>> 3. About the interface of CacheMetricGroup I think it would be easier for cache implementers to use if we expose wrapped function instead of let users provide gauges directly. >> >> >> Thanks for your advice, they are helpful and I have adjusted it. I have a question about it. Does MetricNames.java <https://github.com/apache/flink/blob/master/flink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fmetrics%2FMetricNames.java> contain all metric names? Should I put the cache metric names here? >> >> >> >> >> >> >> Best regards, >> Yuan >> >> >> >> >> >> At 2022-03-07 16:55:18, "Qingsheng Ren" <re...@gmail.com> wrote: >>> Hi Yuan, >>> >>> Thanks for raising this discussion! I believe this will be very helpful for lookup table developers, and standardizing metrics would be essential for users to tuning their systems. >>> >>> Here’s some thoughts in my mind: >>> >>> 1. It looks like “identifier” and “type” are options of cache instead of metrics. I think they are finalized once the cache is created so maybe it’s not quite helpful to report them to the metric system. >>> >>> 2. Some metrics can be aggregated simply in metric systems, like loadCount = loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy. >>> >>> 3. About the interface of CacheMetricGroup I think it would be easier for cache implementers to use if we expose wrapped function instead of let users provide gauges directly. For example: >>> >>> public interface CacheMetricGroup extends MetricGroup { >>> // Mark a cache hit >>> public void markCacheHit(); >>> // Mark a cache miss >>> public void recordCacheMiss(); >>> ... >>> } >>> >>> You can check SourceReaderMetricGroup[1] and its implementation[2] as a reference. >>> >>> Hope these would be helpful! >>> >>> Best regards, >>> >>> Qingsheng Ren >>> >>> [1] https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceReaderMetricGroup.java >>> [2] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java >>> >>> >>>> On Mar 7, 2022, at 16:00, zstraw@163.com wrote: >>>> >>>> Hi devs, >>>> >>>> >>>> I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. >>>> At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot. >>>> >>>> >>>> I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>. >>>> >>>> >>>> Looking forward to your feedback, thanks. >>>> >>>> >>>> Best regards, >>>> Yuan >>>> >>>> >>>> >>>> >>>> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java >>>> [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java >>>> [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java >>>> [4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java

Re:Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

Posted by "zstraw@163.com" <zs...@163.com>.
Hi Qingsheng,
Sorry for my wrong format.

> If I understand correctly these are specified in DDL table options by users.




It's inconvenient for user to checkout the options when they in front of a running task. And they don't know the real underlying options in effect if there are some bugs or other incorrect configurations lead to invalid.




> I don’t think there's a rule that all metric names should be in MetricNames class, but it would be great to aggregate these constants into a unified place. 


It's a good choice to aggregate the constants together.


Best regards,
Yuan

At 2022-03-08 09:57:30, "Qingsheng Ren" <re...@gmail.com> wrote:
>Hi Yuan, 
>
>> how can we tell the real  “identifier” and “type” options in effect to users?
>
>If I understand correctly these are specified in DDL table options by users. For example: 
>
>CREATE TABLE DimTable (…) WITH (
>    ...
>    “cache.identifier” = “guava”, 
>    “cache.type” = “LRU”
>);
>
>> Does MetricNames.java contain all metric names?
>
>
>I don’t think there's a rule that all metric names should be in MetricNames class, but it would be great to aggregate these constants into a unified place. 
>
>Cheers, 
>
>Qingsheng
>
>
>> On Mar 8, 2022, at 10:22, zstraw@163.com wrote:
>> 
>> Hi Qingsheng Ren,
>> Thanks for your feedback.
>> 
>> 
>>> 1. It looks like “identifier” and “type” are options of cache instead of metrics. I think they are finalized once the cache is created so maybe it’s not quite helpful to report them to the metric system.
>> 
>> 
>> Maybe it's not quite appropriate to report them to the metric system, but how can we tell the real  “identifier” and “type” options in effect to users?
>> 
>> 
>> 
>> 
>>> 2. Some metrics can be aggregated simply in metric systems, like loadCount = loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy.
>> 
>> 
>> I agree with you. I have removed these redundant metrics.
>> 
>> 
>>> 3. About the interface of CacheMetricGroup I think it would be easier for cache implementers to use if we expose wrapped function instead of let users provide gauges directly. 
>> 
>> 
>> Thanks for your advice, they are helpful and I have adjusted it. I have a question about it. Does MetricNames.java <https://github.com/apache/flink/blob/master/flink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fmetrics%2FMetricNames.java> contain all metric names? Should I put the cache metric names here?
>> 
>> 
>> 
>> 
>> 
>> 
>> Best regards,
>> Yuan
>> 
>> 
>> 
>> 
>> 
>> At 2022-03-07 16:55:18, "Qingsheng Ren" <re...@gmail.com> wrote:
>>> Hi Yuan, 
>>> 
>>> Thanks for raising this discussion! I believe this will be very helpful for lookup table developers, and standardizing metrics would be essential  for users to tuning their systems. 
>>> 
>>> Here’s some thoughts in my mind:
>>> 
>>> 1. It looks like “identifier” and “type” are options of cache instead of metrics. I think they are finalized once the cache is created so maybe it’s not quite helpful to report them to the metric system.
>>> 
>>> 2. Some metrics can be aggregated simply in metric systems, like loadCount = loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy.
>>> 
>>> 3. About the interface of CacheMetricGroup I think it would be easier for cache implementers to use if we expose wrapped function instead of let users provide gauges directly. For example:
>>> 
>>> public interface CacheMetricGroup extends MetricGroup {
>>>   // Mark a cache hit
>>>   public void markCacheHit();
>>>   // Mark a cache miss
>>>   public void recordCacheMiss();
>>>   ...
>>> } 
>>> 
>>> You can check SourceReaderMetricGroup[1] and its implementation[2] as a reference.
>>> 
>>> Hope these would be helpful!
>>> 
>>> Best regards, 
>>> 
>>> Qingsheng Ren
>>> 
>>> [1] https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceReaderMetricGroup.java
>>> [2] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
>>> 
>>> 
>>>> On Mar 7, 2022, at 16:00, zstraw@163.com wrote:
>>>> 
>>>> Hi devs,
>>>> 
>>>> 
>>>> I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. 
>>>> At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot.
>>>> 
>>>> 
>>>> I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
>>>> 
>>>> 
>>>> Looking forward to your feedback, thanks.
>>>> 
>>>> 
>>>> Best regards,
>>>> Yuan
>>>> 
>>>> 
>>>> 
>>>> 
>>>> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>>>> [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
>>>> [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
>>>> [4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java

Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Yuan, 

> how can we tell the real  “identifier” and “type” options in effect to users?

If I understand correctly these are specified in DDL table options by users. For example: 

CREATE TABLE DimTable (…) WITH (
    ...
    “cache.identifier” = “guava”, 
    “cache.type” = “LRU”
);

> Does MetricNames.java contain all metric names?


I don’t think there's a rule that all metric names should be in MetricNames class, but it would be great to aggregate these constants into a unified place. 

Cheers, 

Qingsheng


> On Mar 8, 2022, at 10:22, zstraw@163.com wrote:
> 
> Hi Qingsheng Ren,
> Thanks for your feedback.
> 
> 
>> 1. It looks like “identifier” and “type” are options of cache instead of metrics. I think they are finalized once the cache is created so maybe it’s not quite helpful to report them to the metric system.
> 
> 
> Maybe it's not quite appropriate to report them to the metric system, but how can we tell the real  “identifier” and “type” options in effect to users?
> 
> 
> 
> 
>> 2. Some metrics can be aggregated simply in metric systems, like loadCount = loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy.
> 
> 
> I agree with you. I have removed these redundant metrics.
> 
> 
>> 3. About the interface of CacheMetricGroup I think it would be easier for cache implementers to use if we expose wrapped function instead of let users provide gauges directly. 
> 
> 
> Thanks for your advice, they are helpful and I have adjusted it. I have a question about it. Does MetricNames.java <https://github.com/apache/flink/blob/master/flink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fmetrics%2FMetricNames.java> contain all metric names? Should I put the cache metric names here?
> 
> 
> 
> 
> 
> 
> Best regards,
> Yuan
> 
> 
> 
> 
> 
> At 2022-03-07 16:55:18, "Qingsheng Ren" <re...@gmail.com> wrote:
>> Hi Yuan, 
>> 
>> Thanks for raising this discussion! I believe this will be very helpful for lookup table developers, and standardizing metrics would be essential  for users to tuning their systems. 
>> 
>> Here’s some thoughts in my mind:
>> 
>> 1. It looks like “identifier” and “type” are options of cache instead of metrics. I think they are finalized once the cache is created so maybe it’s not quite helpful to report them to the metric system.
>> 
>> 2. Some metrics can be aggregated simply in metric systems, like loadCount = loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy.
>> 
>> 3. About the interface of CacheMetricGroup I think it would be easier for cache implementers to use if we expose wrapped function instead of let users provide gauges directly. For example:
>> 
>> public interface CacheMetricGroup extends MetricGroup {
>>   // Mark a cache hit
>>   public void markCacheHit();
>>   // Mark a cache miss
>>   public void recordCacheMiss();
>>   ...
>> } 
>> 
>> You can check SourceReaderMetricGroup[1] and its implementation[2] as a reference.
>> 
>> Hope these would be helpful!
>> 
>> Best regards, 
>> 
>> Qingsheng Ren
>> 
>> [1] https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceReaderMetricGroup.java
>> [2] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
>> 
>> 
>>> On Mar 7, 2022, at 16:00, zstraw@163.com wrote:
>>> 
>>> Hi devs,
>>> 
>>> 
>>> I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. 
>>> At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot.
>>> 
>>> 
>>> I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
>>> 
>>> 
>>> Looking forward to your feedback, thanks.
>>> 
>>> 
>>> Best regards,
>>> Yuan
>>> 
>>> 
>>> 
>>> 
>>> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>>> [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
>>> [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
>>> [4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java


Re:Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

Posted by "zstraw@163.com" <zs...@163.com>.
Hi Qingsheng Ren,
Thanks for your feedback.


> 1. It looks like “identifier” and “type” are options of cache instead of metrics. I think they are finalized once the cache is created so maybe it’s not quite helpful to report them to the metric system.


Maybe it's not quite appropriate to report them to the metric system, but how can we tell the real  “identifier” and “type” options in effect to users?




> 2. Some metrics can be aggregated simply in metric systems, like loadCount = loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy.


I agree with you. I have removed these redundant metrics.


> 3. About the interface of CacheMetricGroup I think it would be easier for cache implementers to use if we expose wrapped function instead of let users provide gauges directly. 


Thanks for your advice, they are helpful and I have adjusted it. I have a question about it. Does MetricNames.java <https://github.com/apache/flink/blob/master/flink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fmetrics%2FMetricNames.java> contain all metric names? Should I put the cache metric names here?






Best regards,
Yuan





At 2022-03-07 16:55:18, "Qingsheng Ren" <re...@gmail.com> wrote:
>Hi Yuan, 
>
>Thanks for raising this discussion! I believe this will be very helpful for lookup table developers, and standardizing metrics would be essential  for users to tuning their systems. 
>
>Here’s some thoughts in my mind:
>
>1. It looks like “identifier” and “type” are options of cache instead of metrics. I think they are finalized once the cache is created so maybe it’s not quite helpful to report them to the metric system.
>
>2. Some metrics can be aggregated simply in metric systems, like loadCount = loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy.
>
>3. About the interface of CacheMetricGroup I think it would be easier for cache implementers to use if we expose wrapped function instead of let users provide gauges directly. For example:
>
>public interface CacheMetricGroup extends MetricGroup {
>    // Mark a cache hit
>    public void markCacheHit();
>    // Mark a cache miss
>    public void recordCacheMiss();
>    ...
>} 
>
>You can check SourceReaderMetricGroup[1] and its implementation[2] as a reference.
>
>Hope these would be helpful!
>
>Best regards, 
>
>Qingsheng Ren
>
>[1] https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceReaderMetricGroup.java
>[2] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
>
>
>> On Mar 7, 2022, at 16:00, zstraw@163.com wrote:
>> 
>> Hi devs,
>> 
>> 
>> I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. 
>> At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot.
>> 
>> 
>> I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
>> 
>> 
>> Looking forward to your feedback, thanks.
>> 
>> 
>> Best regards,
>> Yuan
>> 
>> 
>> 
>> 
>> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>> [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
>> [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
>> [4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java

Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Yuan, 

Thanks for raising this discussion! I believe this will be very helpful for lookup table developers, and standardizing metrics would be essential  for users to tuning their systems. 

Here’s some thoughts in my mind:

1. It looks like “identifier” and “type” are options of cache instead of metrics. I think they are finalized once the cache is created so maybe it’s not quite helpful to report them to the metric system.

2. Some metrics can be aggregated simply in metric systems, like loadCount = loadSuccessCount + loadExceptionCount, so maybe we can just keep fundamental metrics (like loadSuccessCount and loadExceptionCount) to avoid redundancy.

3. About the interface of CacheMetricGroup I think it would be easier for cache implementers to use if we expose wrapped function instead of let users provide gauges directly. For example:

public interface CacheMetricGroup extends MetricGroup {
    // Mark a cache hit
    public void markCacheHit();
    // Mark a cache miss
    public void recordCacheMiss();
    ...
} 

You can check SourceReaderMetricGroup[1] and its implementation[2] as a reference.

Hope these would be helpful!

Best regards, 

Qingsheng Ren

[1] https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SourceReaderMetricGroup.java
[2] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java


> On Mar 7, 2022, at 16:00, zstraw@163.com wrote:
> 
> Hi devs,
> 
> 
> I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. 
> At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot.
> 
> 
> I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
> 
> 
> Looking forward to your feedback, thanks.
> 
> 
> Best regards,
> Yuan
> 
> 
> 
> 
> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
> [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
> [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
> [4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java


Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Yuan, 

I’m fine to skip the OneToManyCache since it’s just a one-line implementation under the abstraction of cache. Thanks for the review and advice. 

Qingsheng

> On Apr 14, 2022, at 19:23, zstraw@163.com wrote:
> 
> Hi Qingsheng,
> 
> 
> 
> 
> Thanks very much for your detail advice. The first two points are very clear and make sense.  I have only one question about the third point.
> 
> 
>> Another point to note is that if you check the usage of cache in JDBC and Hive lookup table, the value type is List<RowData>, since it’s common that a joining key could mapped to multiple rows. We could add another layer of abstraction under Cache interface, for example: 
>> 
>> OneToManyCache<K, V> extends Cache<K, List<V>>
> IMHO, up to now, I haven't found `OneToManyCache` is necessary. The method `appendToKey(List<V>)` can be replaced by put(K, V). What do you think?
> 
> 
> 
> 
> Best regards,
> Yuan
> 
> At 2022-04-13 15:20:01, "Qingsheng Ren" <re...@gmail.com> wrote:
>> Hi Yuan,
>> 
>> Sorry for pending such long time on this thread. I think adding unified abstraction and metrics for cache is quite important for users and developers to optimize and improve their jobs with lookup join. We also have our inner cache abstraction and implementation, so I took a deeper observation and here’re some thoughts of mine. 
>> 
>> 1. Metrics
>> 
>> I think users would be interested to these 3 aspects when debugging or benchmarking their jobs: 
>> 
>> (1) Hit / Miss rate
>> - hitCount, Counter type, to track number of cache hit
>> - missCount, Counter type, to track number of cache miss
>> Here we just report the raw count instead of rate to external metric system, since it’s easier and more flexible to make aggregations and calculate rate in metric systems like Prometheus.
>> 
>> (2) Loading throughput and latency
>> - numBytesLoadedTotal, Counter type, to track number of bytes totally loaded by cache
>> - numRecordsLoadedTotal, Counter type, to track number of records totally loaded
>> These two can be used for tracking the throughput of loading
>> 
>> - latestLoadTime, Gauge type, to track the time spent for the latest load operation
>> Actually it’s better to use histogram for tracking latency, but it’s quite expensive to manage a histogram. Personally I think a gauge would be good enough to reflect the latency.
>> 
>> - numLoadFailures, Counter type, to track number of failed loads.
>> 
>> (3) Current usage
>> - numCachedRecords, Gauge type, to track number of entries in cache
>> - numCachedBytes, Gauge type, to track number of bytes used by cache
>> 
>> Most of the metrics above are similar to your original proposal, and here’s the difference: 
>> (1) I still think it’s weird to report identifier and type as metrics. It’s quite handy to get the actual cache type through the code path, nevertheless some metric systems don't support string-type metrics (like Prometheus). 
>> (2) numRecords is renamed to numCachedRecords
>> (3) loadSuccessCount is deduced by missCount - numLoadFailures. I think users would be interested to know how many times it loads (missCount), and how many failures (numLoadFailures)
>> (4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite meaningful for users to see a long run job reporting totalLoadTime with hours even days as value.
>> 
>> 2. APIs
>> 
>> (1) CacheMetricGroup: 
>> 
>> public interface CacheMetricGroup {
>>   Counter getHitCounter();
>> 
>>   Counter getMissCounter();
>> 
>>   Counter getNumRecordsLoadedTotalCounter();
>> 
>>   Counter getNumBytesLoadedTotalCounter();
>> 
>>   Gauge<Long> getLatestLoadTimeGauge();
>> 
>>   Counter getNumLoadFailureCounter();
>> 
>>   void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
>> 
>>   void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge)
>> }
>> 
>> Note that some metrics are provided as getters since they are quite straight forward, except numCacheRecords/Bytes, which should be left for cache implementers. 
>> 
>> (2) Cache
>> 
>> public interface Cache<K, V> extends AutoClosable {
>>   void open(CacheMetricGroup cacheMetricGroup);
>> 
>>   V get(K key, Callable<? extends V> loader) throws Exception;
>> 
>>   void put(K key, V value);
>> 
>>   void putAll(Map<? extends K, ? extends V> m);
>> 
>>   void clean();
>> 
>>   long size();
>> }
>> 
>> Compared to your proposal: 
>> a. `getIdentifier()` is removed. I can’t see any usage of this function, since we are not dynamically loading cache implementations via SPI or factory style.
>> b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`.
>> c. Extends `AutoClosable` to be symmetric to open, for cleaning resources claimed by cache
>> d. `getMetricGroup()` is removed. Metric groups should be exposed to cache implementations instead of users. 
>> 
>> 3. Other topics
>> Another point to note is that if you check the usage of cache in JDBC and Hive lookup table, the value type is List<RowData>, since it’s common that a joining key could mapped to multiple rows. We could add another layer of abstraction under Cache interface, for example: 
>> 
>> OneToManyCache<K, V> extends Cache<K, List<V>>
>> 
>> And add interfaces like `appendToKey(List<V>)` to it. What do you think?
>> 
>> Cheers, 
>> 
>> Qingsheng
>> 
>>> On Mar 7, 2022, at 16:00, zstraw@163.com wrote:
>>> 
>>> Hi devs,
>>> 
>>> 
>>> I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. 
>>> At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot.
>>> 
>>> 
>>> I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
>>> 
>>> 
>>> Looking forward to your feedback, thanks.
>>> 
>>> 
>>> Best regards,
>>> Yuan
>>> 
>>> 
>>> 
>>> 
>>> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>>> [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
>>> [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
>>> [4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java


Re:Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

Posted by "zstraw@163.com" <zs...@163.com>.
Hi Qingsheng,




Thanks very much for your detail advice. The first two points are very clear and make sense.  I have only one question about the third point.


>Another point to note is that if you check the usage of cache in JDBC and Hive lookup table, the value type is List<RowData>, since it’s common that a joining key could mapped to multiple rows. We could add another layer of abstraction under Cache interface, for example: 
>
>OneToManyCache<K, V> extends Cache<K, List<V>>
IMHO, up to now, I haven't found `OneToManyCache` is necessary. The method `appendToKey(List<V>)` can be replaced by put(K, V). What do you think?




 Best regards,
 Yuan

At 2022-04-13 15:20:01, "Qingsheng Ren" <re...@gmail.com> wrote:
>Hi Yuan,
>
>Sorry for pending such long time on this thread. I think adding unified abstraction and metrics for cache is quite important for users and developers to optimize and improve their jobs with lookup join. We also have our inner cache abstraction and implementation, so I took a deeper observation and here’re some thoughts of mine. 
>
>1. Metrics
>
>I think users would be interested to these 3 aspects when debugging or benchmarking their jobs: 
>
>(1) Hit / Miss rate
>- hitCount, Counter type, to track number of cache hit
>- missCount, Counter type, to track number of cache miss
>Here we just report the raw count instead of rate to external metric system, since it’s easier and more flexible to make aggregations and calculate rate in metric systems like Prometheus.
>
>(2) Loading throughput and latency
>- numBytesLoadedTotal, Counter type, to track number of bytes totally loaded by cache
>- numRecordsLoadedTotal, Counter type, to track number of records totally loaded
>These two can be used for tracking the throughput of loading
>
>- latestLoadTime, Gauge type, to track the time spent for the latest load operation
>Actually it’s better to use histogram for tracking latency, but it’s quite expensive to manage a histogram. Personally I think a gauge would be good enough to reflect the latency.
>
>- numLoadFailures, Counter type, to track number of failed loads.
>
>(3) Current usage
>- numCachedRecords, Gauge type, to track number of entries in cache
>- numCachedBytes, Gauge type, to track number of bytes used by cache
>
>Most of the metrics above are similar to your original proposal, and here’s the difference: 
>(1) I still think it’s weird to report identifier and type as metrics. It’s quite handy to get the actual cache type through the code path, nevertheless some metric systems don't support string-type metrics (like Prometheus). 
>(2) numRecords is renamed to numCachedRecords
>(3) loadSuccessCount is deduced by missCount - numLoadFailures. I think users would be interested to know how many times it loads (missCount), and how many failures (numLoadFailures)
>(4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite meaningful for users to see a long run job reporting totalLoadTime with hours even days as value.
>
>2. APIs
>
>(1) CacheMetricGroup: 
>
>public interface CacheMetricGroup {
>    Counter getHitCounter();
>
>    Counter getMissCounter();
>
>    Counter getNumRecordsLoadedTotalCounter();
>
>    Counter getNumBytesLoadedTotalCounter();
>
>    Gauge<Long> getLatestLoadTimeGauge();
>
>    Counter getNumLoadFailureCounter();
>
>    void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
>
>    void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge)
>}
>
>Note that some metrics are provided as getters since they are quite straight forward, except numCacheRecords/Bytes, which should be left for cache implementers. 
>
>(2) Cache
>
>public interface Cache<K, V> extends AutoClosable {
>    void open(CacheMetricGroup cacheMetricGroup);
>
>    V get(K key, Callable<? extends V> loader) throws Exception;
>
>    void put(K key, V value);
>
>    void putAll(Map<? extends K, ? extends V> m);
>
>    void clean();
>
>    long size();
>}
>
>Compared to your proposal: 
>a. `getIdentifier()` is removed. I can’t see any usage of this function, since we are not dynamically loading cache implementations via SPI or factory style.
>b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`.
>c. Extends `AutoClosable` to be symmetric to open, for cleaning resources claimed by cache
>d. `getMetricGroup()` is removed. Metric groups should be exposed to cache implementations instead of users. 
>
>3. Other topics
>Another point to note is that if you check the usage of cache in JDBC and Hive lookup table, the value type is List<RowData>, since it’s common that a joining key could mapped to multiple rows. We could add another layer of abstraction under Cache interface, for example: 
>
>OneToManyCache<K, V> extends Cache<K, List<V>>
>
>And add interfaces like `appendToKey(List<V>)` to it. What do you think?
>
>Cheers, 
>
>Qingsheng
>
>> On Mar 7, 2022, at 16:00, zstraw@163.com wrote:
>> 
>> Hi devs,
>> 
>> 
>> I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. 
>> At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot.
>> 
>> 
>> I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
>> 
>> 
>> Looking forward to your feedback, thanks.
>> 
>> 
>> Best regards,
>> Yuan
>> 
>> 
>> 
>> 
>> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>> [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
>> [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
>> [4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java

Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Yuan,

Sorry for pending such long time on this thread. I think adding unified abstraction and metrics for cache is quite important for users and developers to optimize and improve their jobs with lookup join. We also have our inner cache abstraction and implementation, so I took a deeper observation and here’re some thoughts of mine. 

1. Metrics

I think users would be interested to these 3 aspects when debugging or benchmarking their jobs: 

(1) Hit / Miss rate
- hitCount, Counter type, to track number of cache hit
- missCount, Counter type, to track number of cache miss
Here we just report the raw count instead of rate to external metric system, since it’s easier and more flexible to make aggregations and calculate rate in metric systems like Prometheus.

(2) Loading throughput and latency
- numBytesLoadedTotal, Counter type, to track number of bytes totally loaded by cache
- numRecordsLoadedTotal, Counter type, to track number of records totally loaded
These two can be used for tracking the throughput of loading

- latestLoadTime, Gauge type, to track the time spent for the latest load operation
Actually it’s better to use histogram for tracking latency, but it’s quite expensive to manage a histogram. Personally I think a gauge would be good enough to reflect the latency.

- numLoadFailures, Counter type, to track number of failed loads.

(3) Current usage
- numCachedRecords, Gauge type, to track number of entries in cache
- numCachedBytes, Gauge type, to track number of bytes used by cache

Most of the metrics above are similar to your original proposal, and here’s the difference: 
(1) I still think it’s weird to report identifier and type as metrics. It’s quite handy to get the actual cache type through the code path, nevertheless some metric systems don't support string-type metrics (like Prometheus). 
(2) numRecords is renamed to numCachedRecords
(3) loadSuccessCount is deduced by missCount - numLoadFailures. I think users would be interested to know how many times it loads (missCount), and how many failures (numLoadFailures)
(4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite meaningful for users to see a long run job reporting totalLoadTime with hours even days as value.

2. APIs

(1) CacheMetricGroup: 

public interface CacheMetricGroup {
    Counter getHitCounter();

    Counter getMissCounter();

    Counter getNumRecordsLoadedTotalCounter();

    Counter getNumBytesLoadedTotalCounter();

    Gauge<Long> getLatestLoadTimeGauge();

    Counter getNumLoadFailureCounter();

    void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge)
}

Note that some metrics are provided as getters since they are quite straight forward, except numCacheRecords/Bytes, which should be left for cache implementers. 

(2) Cache

public interface Cache<K, V> extends AutoClosable {
    void open(CacheMetricGroup cacheMetricGroup);

    V get(K key, Callable<? extends V> loader) throws Exception;

    void put(K key, V value);

    void putAll(Map<? extends K, ? extends V> m);

    void clean();

    long size();
}

Compared to your proposal: 
a. `getIdentifier()` is removed. I can’t see any usage of this function, since we are not dynamically loading cache implementations via SPI or factory style.
b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`.
c. Extends `AutoClosable` to be symmetric to open, for cleaning resources claimed by cache
d. `getMetricGroup()` is removed. Metric groups should be exposed to cache implementations instead of users. 

3. Other topics
Another point to note is that if you check the usage of cache in JDBC and Hive lookup table, the value type is List<RowData>, since it’s common that a joining key could mapped to multiple rows. We could add another layer of abstraction under Cache interface, for example: 

OneToManyCache<K, V> extends Cache<K, List<V>>

And add interfaces like `appendToKey(List<V>)` to it. What do you think?

Cheers, 

Qingsheng

> On Mar 7, 2022, at 16:00, zstraw@163.com wrote:
> 
> Hi devs,
> 
> 
> I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. 
> At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot.
> 
> 
> I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
> 
> 
> Looking forward to your feedback, thanks.
> 
> 
> Best regards,
> Yuan
> 
> 
> 
> 
> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
> [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
> [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
> [4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java