You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Qingsheng Ren <re...@gmail.com> on 2022/04/13 08:20:01 UTC

Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

Hi Yuan,

Sorry for pending such long time on this thread. I think adding unified abstraction and metrics for cache is quite important for users and developers to optimize and improve their jobs with lookup join. We also have our inner cache abstraction and implementation, so I took a deeper observation and here’re some thoughts of mine. 

1. Metrics

I think users would be interested to these 3 aspects when debugging or benchmarking their jobs: 

(1) Hit / Miss rate
- hitCount, Counter type, to track number of cache hit
- missCount, Counter type, to track number of cache miss
Here we just report the raw count instead of rate to external metric system, since it’s easier and more flexible to make aggregations and calculate rate in metric systems like Prometheus.

(2) Loading throughput and latency
- numBytesLoadedTotal, Counter type, to track number of bytes totally loaded by cache
- numRecordsLoadedTotal, Counter type, to track number of records totally loaded
These two can be used for tracking the throughput of loading

- latestLoadTime, Gauge type, to track the time spent for the latest load operation
Actually it’s better to use histogram for tracking latency, but it’s quite expensive to manage a histogram. Personally I think a gauge would be good enough to reflect the latency.

- numLoadFailures, Counter type, to track number of failed loads.

(3) Current usage
- numCachedRecords, Gauge type, to track number of entries in cache
- numCachedBytes, Gauge type, to track number of bytes used by cache

Most of the metrics above are similar to your original proposal, and here’s the difference: 
(1) I still think it’s weird to report identifier and type as metrics. It’s quite handy to get the actual cache type through the code path, nevertheless some metric systems don't support string-type metrics (like Prometheus). 
(2) numRecords is renamed to numCachedRecords
(3) loadSuccessCount is deduced by missCount - numLoadFailures. I think users would be interested to know how many times it loads (missCount), and how many failures (numLoadFailures)
(4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite meaningful for users to see a long run job reporting totalLoadTime with hours even days as value.

2. APIs

(1) CacheMetricGroup: 

public interface CacheMetricGroup {
    Counter getHitCounter();

    Counter getMissCounter();

    Counter getNumRecordsLoadedTotalCounter();

    Counter getNumBytesLoadedTotalCounter();

    Gauge<Long> getLatestLoadTimeGauge();

    Counter getNumLoadFailureCounter();

    void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge)
}

Note that some metrics are provided as getters since they are quite straight forward, except numCacheRecords/Bytes, which should be left for cache implementers. 

(2) Cache

public interface Cache<K, V> extends AutoClosable {
    void open(CacheMetricGroup cacheMetricGroup);

    V get(K key, Callable<? extends V> loader) throws Exception;

    void put(K key, V value);

    void putAll(Map<? extends K, ? extends V> m);

    void clean();

    long size();
}

Compared to your proposal: 
a. `getIdentifier()` is removed. I can’t see any usage of this function, since we are not dynamically loading cache implementations via SPI or factory style.
b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`.
c. Extends `AutoClosable` to be symmetric to open, for cleaning resources claimed by cache
d. `getMetricGroup()` is removed. Metric groups should be exposed to cache implementations instead of users. 

3. Other topics
Another point to note is that if you check the usage of cache in JDBC and Hive lookup table, the value type is List<RowData>, since it’s common that a joining key could mapped to multiple rows. We could add another layer of abstraction under Cache interface, for example: 

OneToManyCache<K, V> extends Cache<K, List<V>>

And add interfaces like `appendToKey(List<V>)` to it. What do you think?

Cheers, 

Qingsheng

> On Mar 7, 2022, at 16:00, zstraw@163.com wrote:
> 
> Hi devs,
> 
> 
> I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. 
> At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot.
> 
> 
> I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
> 
> 
> Looking forward to your feedback, thanks.
> 
> 
> Best regards,
> Yuan
> 
> 
> 
> 
> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
> [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
> [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
> [4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java


Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Yuan, 

I’m fine to skip the OneToManyCache since it’s just a one-line implementation under the abstraction of cache. Thanks for the review and advice. 

Qingsheng

> On Apr 14, 2022, at 19:23, zstraw@163.com wrote:
> 
> Hi Qingsheng,
> 
> 
> 
> 
> Thanks very much for your detail advice. The first two points are very clear and make sense.  I have only one question about the third point.
> 
> 
>> Another point to note is that if you check the usage of cache in JDBC and Hive lookup table, the value type is List<RowData>, since it’s common that a joining key could mapped to multiple rows. We could add another layer of abstraction under Cache interface, for example: 
>> 
>> OneToManyCache<K, V> extends Cache<K, List<V>>
> IMHO, up to now, I haven't found `OneToManyCache` is necessary. The method `appendToKey(List<V>)` can be replaced by put(K, V). What do you think?
> 
> 
> 
> 
> Best regards,
> Yuan
> 
> At 2022-04-13 15:20:01, "Qingsheng Ren" <re...@gmail.com> wrote:
>> Hi Yuan,
>> 
>> Sorry for pending such long time on this thread. I think adding unified abstraction and metrics for cache is quite important for users and developers to optimize and improve their jobs with lookup join. We also have our inner cache abstraction and implementation, so I took a deeper observation and here’re some thoughts of mine. 
>> 
>> 1. Metrics
>> 
>> I think users would be interested to these 3 aspects when debugging or benchmarking their jobs: 
>> 
>> (1) Hit / Miss rate
>> - hitCount, Counter type, to track number of cache hit
>> - missCount, Counter type, to track number of cache miss
>> Here we just report the raw count instead of rate to external metric system, since it’s easier and more flexible to make aggregations and calculate rate in metric systems like Prometheus.
>> 
>> (2) Loading throughput and latency
>> - numBytesLoadedTotal, Counter type, to track number of bytes totally loaded by cache
>> - numRecordsLoadedTotal, Counter type, to track number of records totally loaded
>> These two can be used for tracking the throughput of loading
>> 
>> - latestLoadTime, Gauge type, to track the time spent for the latest load operation
>> Actually it’s better to use histogram for tracking latency, but it’s quite expensive to manage a histogram. Personally I think a gauge would be good enough to reflect the latency.
>> 
>> - numLoadFailures, Counter type, to track number of failed loads.
>> 
>> (3) Current usage
>> - numCachedRecords, Gauge type, to track number of entries in cache
>> - numCachedBytes, Gauge type, to track number of bytes used by cache
>> 
>> Most of the metrics above are similar to your original proposal, and here’s the difference: 
>> (1) I still think it’s weird to report identifier and type as metrics. It’s quite handy to get the actual cache type through the code path, nevertheless some metric systems don't support string-type metrics (like Prometheus). 
>> (2) numRecords is renamed to numCachedRecords
>> (3) loadSuccessCount is deduced by missCount - numLoadFailures. I think users would be interested to know how many times it loads (missCount), and how many failures (numLoadFailures)
>> (4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite meaningful for users to see a long run job reporting totalLoadTime with hours even days as value.
>> 
>> 2. APIs
>> 
>> (1) CacheMetricGroup: 
>> 
>> public interface CacheMetricGroup {
>>   Counter getHitCounter();
>> 
>>   Counter getMissCounter();
>> 
>>   Counter getNumRecordsLoadedTotalCounter();
>> 
>>   Counter getNumBytesLoadedTotalCounter();
>> 
>>   Gauge<Long> getLatestLoadTimeGauge();
>> 
>>   Counter getNumLoadFailureCounter();
>> 
>>   void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
>> 
>>   void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge)
>> }
>> 
>> Note that some metrics are provided as getters since they are quite straight forward, except numCacheRecords/Bytes, which should be left for cache implementers. 
>> 
>> (2) Cache
>> 
>> public interface Cache<K, V> extends AutoClosable {
>>   void open(CacheMetricGroup cacheMetricGroup);
>> 
>>   V get(K key, Callable<? extends V> loader) throws Exception;
>> 
>>   void put(K key, V value);
>> 
>>   void putAll(Map<? extends K, ? extends V> m);
>> 
>>   void clean();
>> 
>>   long size();
>> }
>> 
>> Compared to your proposal: 
>> a. `getIdentifier()` is removed. I can’t see any usage of this function, since we are not dynamically loading cache implementations via SPI or factory style.
>> b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`.
>> c. Extends `AutoClosable` to be symmetric to open, for cleaning resources claimed by cache
>> d. `getMetricGroup()` is removed. Metric groups should be exposed to cache implementations instead of users. 
>> 
>> 3. Other topics
>> Another point to note is that if you check the usage of cache in JDBC and Hive lookup table, the value type is List<RowData>, since it’s common that a joining key could mapped to multiple rows. We could add another layer of abstraction under Cache interface, for example: 
>> 
>> OneToManyCache<K, V> extends Cache<K, List<V>>
>> 
>> And add interfaces like `appendToKey(List<V>)` to it. What do you think?
>> 
>> Cheers, 
>> 
>> Qingsheng
>> 
>>> On Mar 7, 2022, at 16:00, zstraw@163.com wrote:
>>> 
>>> Hi devs,
>>> 
>>> 
>>> I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. 
>>> At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot.
>>> 
>>> 
>>> I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
>>> 
>>> 
>>> Looking forward to your feedback, thanks.
>>> 
>>> 
>>> Best regards,
>>> Yuan
>>> 
>>> 
>>> 
>>> 
>>> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>>> [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
>>> [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
>>> [4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java


Re:Re: [DISCUSS] The abstraction of cache lookupFunction and cache metric

Posted by "zstraw@163.com" <zs...@163.com>.
Hi Qingsheng,




Thanks very much for your detail advice. The first two points are very clear and make sense.  I have only one question about the third point.


>Another point to note is that if you check the usage of cache in JDBC and Hive lookup table, the value type is List<RowData>, since it’s common that a joining key could mapped to multiple rows. We could add another layer of abstraction under Cache interface, for example: 
>
>OneToManyCache<K, V> extends Cache<K, List<V>>
IMHO, up to now, I haven't found `OneToManyCache` is necessary. The method `appendToKey(List<V>)` can be replaced by put(K, V). What do you think?




 Best regards,
 Yuan

At 2022-04-13 15:20:01, "Qingsheng Ren" <re...@gmail.com> wrote:
>Hi Yuan,
>
>Sorry for pending such long time on this thread. I think adding unified abstraction and metrics for cache is quite important for users and developers to optimize and improve their jobs with lookup join. We also have our inner cache abstraction and implementation, so I took a deeper observation and here’re some thoughts of mine. 
>
>1. Metrics
>
>I think users would be interested to these 3 aspects when debugging or benchmarking their jobs: 
>
>(1) Hit / Miss rate
>- hitCount, Counter type, to track number of cache hit
>- missCount, Counter type, to track number of cache miss
>Here we just report the raw count instead of rate to external metric system, since it’s easier and more flexible to make aggregations and calculate rate in metric systems like Prometheus.
>
>(2) Loading throughput and latency
>- numBytesLoadedTotal, Counter type, to track number of bytes totally loaded by cache
>- numRecordsLoadedTotal, Counter type, to track number of records totally loaded
>These two can be used for tracking the throughput of loading
>
>- latestLoadTime, Gauge type, to track the time spent for the latest load operation
>Actually it’s better to use histogram for tracking latency, but it’s quite expensive to manage a histogram. Personally I think a gauge would be good enough to reflect the latency.
>
>- numLoadFailures, Counter type, to track number of failed loads.
>
>(3) Current usage
>- numCachedRecords, Gauge type, to track number of entries in cache
>- numCachedBytes, Gauge type, to track number of bytes used by cache
>
>Most of the metrics above are similar to your original proposal, and here’s the difference: 
>(1) I still think it’s weird to report identifier and type as metrics. It’s quite handy to get the actual cache type through the code path, nevertheless some metric systems don't support string-type metrics (like Prometheus). 
>(2) numRecords is renamed to numCachedRecords
>(3) loadSuccessCount is deduced by missCount - numLoadFailures. I think users would be interested to know how many times it loads (missCount), and how many failures (numLoadFailures)
>(4) totalLoadTime is replaced by latestLoadTime. I think it’s not quite meaningful for users to see a long run job reporting totalLoadTime with hours even days as value.
>
>2. APIs
>
>(1) CacheMetricGroup: 
>
>public interface CacheMetricGroup {
>    Counter getHitCounter();
>
>    Counter getMissCounter();
>
>    Counter getNumRecordsLoadedTotalCounter();
>
>    Counter getNumBytesLoadedTotalCounter();
>
>    Gauge<Long> getLatestLoadTimeGauge();
>
>    Counter getNumLoadFailureCounter();
>
>    void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
>
>    void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge)
>}
>
>Note that some metrics are provided as getters since they are quite straight forward, except numCacheRecords/Bytes, which should be left for cache implementers. 
>
>(2) Cache
>
>public interface Cache<K, V> extends AutoClosable {
>    void open(CacheMetricGroup cacheMetricGroup);
>
>    V get(K key, Callable<? extends V> loader) throws Exception;
>
>    void put(K key, V value);
>
>    void putAll(Map<? extends K, ? extends V> m);
>
>    void clean();
>
>    long size();
>}
>
>Compared to your proposal: 
>a. `getIdentifier()` is removed. I can’t see any usage of this function, since we are not dynamically loading cache implementations via SPI or factory style.
>b. `init()` and `initMetric()` are merged to `open(CacheMetricGroup)`.
>c. Extends `AutoClosable` to be symmetric to open, for cleaning resources claimed by cache
>d. `getMetricGroup()` is removed. Metric groups should be exposed to cache implementations instead of users. 
>
>3. Other topics
>Another point to note is that if you check the usage of cache in JDBC and Hive lookup table, the value type is List<RowData>, since it’s common that a joining key could mapped to multiple rows. We could add another layer of abstraction under Cache interface, for example: 
>
>OneToManyCache<K, V> extends Cache<K, List<V>>
>
>And add interfaces like `appendToKey(List<V>)` to it. What do you think?
>
>Cheers, 
>
>Qingsheng
>
>> On Mar 7, 2022, at 16:00, zstraw@163.com wrote:
>> 
>> Hi devs,
>> 
>> 
>> I would like to propose a discussion thread about abstraction of Cache LookupFunction with metrics for cache in connectors to make cache out of box for connector developers. There are multiple LookupFunction implementations in individual connectors [1][2][3][4] so far. 
>> At the same time, users can monitor cache in LookupFunction by adding uniform cache metrics to optimize tasks or troubleshoot.
>> 
>> 
>> I have posted an issue about this, see <https://issues.apache.org/jira/browse/FLINK-25409>, and made a brief design <https://docs.google.com/document/d/1L2eo7VABZBdRxoRP_wPvVwuvTZOV9qrN9gEQxjhSJOc/edit?usp=sharing>.
>> 
>> 
>> Looking forward to your feedback, thanks.
>> 
>> 
>> Best regards,
>> Yuan
>> 
>> 
>> 
>> 
>> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
>> [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
>> [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
>> [4] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java