You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/07 07:28:08 UTC

[GitHub] [flink] PatrickRen opened a new pull request, #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

PatrickRen opened a new pull request, #20196:
URL: https://github.com/apache/flink/pull/20196

   ## What is the purpose of the change
   
   This pull request introduces interfaces for LookupCache and CacheMetricGroup, also a default implementation of LookupCache.
   
   
   ## Brief change log
   
   - Add new interfaces `LookupCache` and `CacheMetricGroup`
   - Add default implementation for `LookupCache` as `DefaultLookupCache`
   - Add an internal implementation for `CacheMetricGroup` as `InternalCacheMetricGroup`
   
   
   ## Verifying this change
   
   This change added unit tests for the newly added `DefaultLookupCache` and `InternalCacheMetricGroup`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20196:
URL: https://github.com/apache/flink/pull/20196#discussion_r935210068


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCache.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.clock.Clock;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.PARTIAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link LookupCache}. */
+@PublicEvolving
+public class DefaultLookupCache implements LookupCache {
+    private static final long serialVersionUID = 1L;
+
+    // Configurations of the cache
+    private final Duration expireAfterAccessDuration;
+    private final Duration expireAfterWriteDuration;
+    private final Long maximumSize;
+    private final boolean cacheMissingKey;
+
+    // The underlying Guava cache implementation
+    private transient Cache<RowData, Collection<RowData>> guavaCache;
+
+    // Guava Ticker for testing expiration
+    private transient Ticker ticker;
+
+    // For tracking cache metrics
+    private final transient Counter hitCounter;
+    private final transient Counter missCounter;
+
+    private DefaultLookupCache(
+            Duration expireAfterAccessDuration,
+            Duration expireAfterWriteDuration,
+            Long maximumSize,
+            boolean cacheMissingKey) {
+        this.expireAfterAccessDuration = expireAfterAccessDuration;
+        this.expireAfterWriteDuration = expireAfterWriteDuration;
+        this.maximumSize = maximumSize;
+        this.cacheMissingKey = cacheMissingKey;
+        sanityCheck();
+        hitCounter = new ThreadSafeSimpleCounter();
+        missCounter = new ThreadSafeSimpleCounter();

Review Comment:
   I think this is problematic. When the cache is shipped to cluster for running, the `hitCounter` and `missCounter` will be null. It, of course, will run into NPE during `get` and `put`. 
   
   I think we can turn `testCacheSerialization` into a test before method. Every tests should run on a deserialized cache object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on PR #20196:
URL: https://github.com/apache/flink/pull/20196#issuecomment-1198946067

   @wuchong Thanks for the review! I addresses your comments in the latest commit. Please take a look when you are available. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #20196:
URL: https://github.com/apache/flink/pull/20196#discussion_r935172164


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/LookupOptions.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+/** Predefined options for lookup cache. */
+public class LookupOptions {
+    public static final ConfigOption<LookupCacheType> CACHE_TYPE =
+            ConfigOptions.key("lookup.cache")
+                    .enumType(LookupCacheType.class)
+                    .defaultValue(LookupCacheType.NONE)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The caching strategy for this lookup table, including %s, %s and %s",
+                                            code(LookupCacheType.NONE.toString()),
+                                            code(LookupCacheType.PARTIAL.toString()),
+                                            code(LookupCacheType.FULL.toString()))
+                                    .build());
+
+    public static final ConfigOption<Integer> MAX_RETRIES =

Review Comment:
   Well we do plan to migrate hbase and jdbc to the new framework, so I prefer to keep this option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20196:
URL: https://github.com/apache/flink/pull/20196#issuecomment-1177205928

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dd5d10df1abeba770807ea291f5c96901ac7fe55",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dd5d10df1abeba770807ea291f5c96901ac7fe55",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dd5d10df1abeba770807ea291f5c96901ac7fe55 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on PR #20196:
URL: https://github.com/apache/flink/pull/20196#issuecomment-1202103877

   @wuchong @SmirAlex @lincoln-lil Thank you all for the review! I updated the PR to address your comments. Please take another look at your convenience. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] SmirAlex commented on a diff in pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
SmirAlex commented on code in PR #20196:
URL: https://github.com/apache/flink/pull/20196#discussion_r933765687


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCache.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.clock.Clock;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.PARTIAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link LookupCache}. */
+@PublicEvolving
+public class DefaultLookupCache implements LookupCache {
+    private static final long serialVersionUID = 6839408984141411172L;
+
+    // Configurations of the cache
+    private final Duration expireAfterAccessDuration;
+    private final Duration expireAfterWriteDuration;
+    private final Long maximumSize;
+    private final boolean cacheMissingKey;
+
+    // The underlying Guava cache implementation
+    private transient Cache<RowData, Collection<RowData>> guavaCache;
+
+    // Guava Ticker for testing expiration
+    private transient Ticker ticker;
+
+    // For tracking cache metrics
+    private transient Counter hitCounter;
+    private transient Counter missCounter;
+
+    private DefaultLookupCache(
+            Duration expireAfterAccessDuration,
+            Duration expireAfterWriteDuration,
+            Long maximumSize,
+            boolean cacheMissingKey) {
+        this.expireAfterAccessDuration = expireAfterAccessDuration;
+        this.expireAfterWriteDuration = expireAfterWriteDuration;
+        this.maximumSize = maximumSize;
+        this.cacheMissingKey = cacheMissingKey;
+    }
+
+    /** Creates a builder for the cache. */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static DefaultLookupCache fromConfig(ReadableConfig config) {
+        checkArgument(
+                config.get(CACHE_TYPE).equals(PARTIAL),
+                "'%s' should be '%s' in order to build a default lookup cache",
+                CACHE_TYPE.key(),
+                PARTIAL);
+        return new DefaultLookupCache(
+                config.get(PARTIAL_CACHE_EXPIRE_AFTER_ACCESS),
+                config.get(PARTIAL_CACHE_EXPIRE_AFTER_WRITE),
+                config.get(PARTIAL_CACHE_MAX_ROWS),
+                config.get(PARTIAL_CACHE_CACHE_MISSING_KEY));
+    }
+
+    @Override
+    public void open(CacheMetricGroup metricGroup) {
+        synchronized (this) {
+            // The cache should already been opened if guava cache is not null
+            if (guavaCache != null) {
+                return;
+            }
+            // Initialize Guava cache
+            CacheBuilder<Object, Object> guavaCacheBuilder = CacheBuilder.newBuilder();
+            if (expireAfterAccessDuration != null) {
+                guavaCacheBuilder.expireAfterAccess(expireAfterAccessDuration);
+            }
+            if (expireAfterWriteDuration != null) {
+                guavaCacheBuilder.expireAfterWrite(expireAfterWriteDuration);
+            }
+            if (maximumSize != null) {
+                guavaCacheBuilder.maximumSize(maximumSize);
+            }
+            if (ticker != null) {
+                guavaCacheBuilder.ticker(ticker);
+            }
+            guavaCache = guavaCacheBuilder.build();
+
+            // Initialize and register metrics
+            // Here we can't reuse Guava cache statistics because guavaCache#getIfPresent is not
+            // counted in the stat
+            hitCounter = new ThreadSafeSimpleCounter();
+            missCounter = new ThreadSafeSimpleCounter();
+            metricGroup.hitCounter(hitCounter);
+            metricGroup.missCounter(missCounter);
+            metricGroup.numCachedRecordsGauge(() -> guavaCache.size());

Review Comment:
   You can simplify to guavaCache::size



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen merged pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
PatrickRen merged PR #20196:
URL: https://github.com/apache/flink/pull/20196


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20196:
URL: https://github.com/apache/flink/pull/20196#discussion_r935204581


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCache.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.clock.Clock;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.PARTIAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link LookupCache}. */
+@PublicEvolving
+public class DefaultLookupCache implements LookupCache {
+    private static final long serialVersionUID = 1L;
+
+    // Configurations of the cache
+    private final Duration expireAfterAccessDuration;
+    private final Duration expireAfterWriteDuration;
+    private final Long maximumSize;
+    private final boolean cacheMissingKey;
+
+    // The underlying Guava cache implementation
+    private transient Cache<RowData, Collection<RowData>> guavaCache;
+
+    // Guava Ticker for testing expiration
+    private transient Ticker ticker;
+
+    // For tracking cache metrics
+    private final transient Counter hitCounter;
+    private final transient Counter missCounter;
+
+    private DefaultLookupCache(
+            Duration expireAfterAccessDuration,
+            Duration expireAfterWriteDuration,
+            Long maximumSize,
+            boolean cacheMissingKey) {
+        this.expireAfterAccessDuration = expireAfterAccessDuration;
+        this.expireAfterWriteDuration = expireAfterWriteDuration;
+        this.maximumSize = maximumSize;
+        this.cacheMissingKey = cacheMissingKey;
+        sanityCheck();

Review Comment:
   I'm fine with adding the sanity check in the constructor. But would be better to add checks in `DefaultLookupCache#fromConfig` as well to indicate to users **which configuration is missed to configure**. I'm afraid the current exception is useless for users. It can't help users fix their pipelines. 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCache.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.clock.Clock;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.PARTIAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link LookupCache}. */
+@PublicEvolving
+public class DefaultLookupCache implements LookupCache {
+    private static final long serialVersionUID = 1L;
+
+    // Configurations of the cache
+    private final Duration expireAfterAccessDuration;
+    private final Duration expireAfterWriteDuration;
+    private final Long maximumSize;
+    private final boolean cacheMissingKey;
+
+    // The underlying Guava cache implementation
+    private transient Cache<RowData, Collection<RowData>> guavaCache;
+
+    // Guava Ticker for testing expiration
+    private transient Ticker ticker;
+
+    // For tracking cache metrics
+    private final transient Counter hitCounter;
+    private final transient Counter missCounter;
+
+    private DefaultLookupCache(
+            Duration expireAfterAccessDuration,
+            Duration expireAfterWriteDuration,
+            Long maximumSize,
+            boolean cacheMissingKey) {
+        this.expireAfterAccessDuration = expireAfterAccessDuration;
+        this.expireAfterWriteDuration = expireAfterWriteDuration;
+        this.maximumSize = maximumSize;
+        this.cacheMissingKey = cacheMissingKey;
+        sanityCheck();
+        hitCounter = new ThreadSafeSimpleCounter();
+        missCounter = new ThreadSafeSimpleCounter();

Review Comment:
   I think this is problematic. When the cache is shipped to cluster for running, the `hitCounter` and `missCounter` are null. It, of course, will run into NPE during `get` and `put`. 
   
   I think we can turn `testCacheSerialization` into a test before method. Every tests should run on a deserialized cache object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20196:
URL: https://github.com/apache/flink/pull/20196#discussion_r933207081


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCache.java:
##########
@@ -97,30 +97,36 @@ public static DefaultLookupCache fromConfig(ReadableConfig config) {
 
     @Override
     public void open(CacheMetricGroup metricGroup) {
-        // Initialize Guava cache
-        CacheBuilder<Object, Object> guavaCacheBuilder = CacheBuilder.newBuilder();
-        if (expireAfterAccessDuration != null) {
-            guavaCacheBuilder.expireAfterAccess(expireAfterAccessDuration);
-        }
-        if (expireAfterWriteDuration != null) {
-            guavaCacheBuilder.expireAfterWrite(expireAfterWriteDuration);
-        }
-        if (maximumSize != null) {
-            guavaCacheBuilder.maximumSize(maximumSize);
-        }
-        if (ticker != null) {
-            guavaCacheBuilder.ticker(ticker);
+        synchronized (this) {
+            // The cache should already been opened if guava cache is not null
+            if (guavaCache != null) {
+                return;

Review Comment:
   We may need to register the metrics to the given `metricGroup` again. Every subtask will retrieve the shared cache and register cache metrics to the operator metric group. 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCache.java:
##########
@@ -222,11 +248,23 @@ public Builder cacheMissingKey(boolean cacheMissingKey) {
 
         /** Creates the cache. */
         public DefaultLookupCache build() {
+            sanityCheck();
             return new DefaultLookupCache(
                     expireAfterAccessDuration,
                     expireAfterWriteDuration,
                     maximumSize,
                     cacheMissingKey);
         }
+
+        private void sanityCheck() {
+            if (expireAfterWriteDuration == null
+                    && expireAfterAccessDuration == null
+                    && maximumSize == null) {
+                throw new IllegalArgumentException(

Review Comment:
   Add check for `org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache#fromConfig` as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20196:
URL: https://github.com/apache/flink/pull/20196#discussion_r932396717


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCache.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.clock.Clock;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.PARTIAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link LookupCache}. */
+@PublicEvolving
+public class DefaultLookupCache implements LookupCache {
+    private static final long serialVersionUID = 6839408984141411172L;
+
+    // Configurations of the cache
+    private final Duration expireAfterAccessDuration;
+    private final Duration expireAfterWriteDuration;
+    private final Long maximumSize;
+    private final boolean cacheMissingKey;
+
+    // The underlying Guava cache implementation
+    private transient Cache<RowData, Collection<RowData>> guavaCache;
+
+    // Guava Ticker for testing expiration
+    private Ticker ticker;
+
+    // For tracking cache metrics
+    private Counter hitCounter;
+    private Counter missCounter;
+
+    private DefaultLookupCache(
+            Duration expireAfterAccessDuration,
+            Duration expireAfterWriteDuration,
+            Long maximumSize,
+            boolean cacheMissingKey) {
+        this.expireAfterAccessDuration = expireAfterAccessDuration;
+        this.expireAfterWriteDuration = expireAfterWriteDuration;
+        this.maximumSize = maximumSize;
+        this.cacheMissingKey = cacheMissingKey;
+    }
+
+    /** Creates a builder for the cache. */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static DefaultLookupCache fromConfig(ReadableConfig config) {
+        checkArgument(
+                config.get(CACHE_TYPE).equals(PARTIAL),
+                "'%s' should be '%s' in order to build a default lookup cache",
+                CACHE_TYPE.key(),
+                PARTIAL);
+        return new DefaultLookupCache(
+                config.get(PARTIAL_CACHE_EXPIRE_AFTER_ACCESS),
+                config.get(PARTIAL_CACHE_EXPIRE_AFTER_WRITE),
+                config.get(PARTIAL_CACHE_MAX_ROWS),
+                config.get(PARTIAL_CACHE_CACHE_MISSING_KEY));
+    }
+
+    @Override
+    public void open(CacheMetricGroup metricGroup) {

Review Comment:
   The `open()` method can be called concurrently and multiple times. So the implementation should be thread-safe, and we shouldn't initialize the cache again. Please add concurrent tests for `open()` and other methods. 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCache.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.clock.Clock;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.PARTIAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link LookupCache}. */
+@PublicEvolving
+public class DefaultLookupCache implements LookupCache {
+    private static final long serialVersionUID = 6839408984141411172L;
+
+    // Configurations of the cache
+    private final Duration expireAfterAccessDuration;
+    private final Duration expireAfterWriteDuration;
+    private final Long maximumSize;
+    private final boolean cacheMissingKey;
+
+    // The underlying Guava cache implementation
+    private transient Cache<RowData, Collection<RowData>> guavaCache;
+
+    // Guava Ticker for testing expiration
+    private Ticker ticker;
+
+    // For tracking cache metrics
+    private Counter hitCounter;
+    private Counter missCounter;

Review Comment:
   Should be `transient`. Please add a serialization test for this. 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCache.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.clock.Clock;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.PARTIAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link LookupCache}. */
+@PublicEvolving
+public class DefaultLookupCache implements LookupCache {
+    private static final long serialVersionUID = 6839408984141411172L;
+
+    // Configurations of the cache
+    private final Duration expireAfterAccessDuration;
+    private final Duration expireAfterWriteDuration;
+    private final Long maximumSize;
+    private final boolean cacheMissingKey;
+
+    // The underlying Guava cache implementation
+    private transient Cache<RowData, Collection<RowData>> guavaCache;
+
+    // Guava Ticker for testing expiration
+    private Ticker ticker;
+
+    // For tracking cache metrics
+    private Counter hitCounter;
+    private Counter missCounter;
+
+    private DefaultLookupCache(
+            Duration expireAfterAccessDuration,
+            Duration expireAfterWriteDuration,
+            Long maximumSize,
+            boolean cacheMissingKey) {
+        this.expireAfterAccessDuration = expireAfterAccessDuration;
+        this.expireAfterWriteDuration = expireAfterWriteDuration;
+        this.maximumSize = maximumSize;
+        this.cacheMissingKey = cacheMissingKey;
+    }
+
+    /** Creates a builder for the cache. */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static DefaultLookupCache fromConfig(ReadableConfig config) {
+        checkArgument(
+                config.get(CACHE_TYPE).equals(PARTIAL),
+                "'%s' should be '%s' in order to build a default lookup cache",
+                CACHE_TYPE.key(),
+                PARTIAL);
+        return new DefaultLookupCache(
+                config.get(PARTIAL_CACHE_EXPIRE_AFTER_ACCESS),
+                config.get(PARTIAL_CACHE_EXPIRE_AFTER_WRITE),
+                config.get(PARTIAL_CACHE_MAX_ROWS),
+                config.get(PARTIAL_CACHE_CACHE_MISSING_KEY));
+    }

Review Comment:
   It is easy to lead OOM if we don't set the TTL and maxRows. I would suggest validating it has been set or provide a default value.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/LookupOptions.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+/** Predefined options for lookup cache. */
+public class LookupOptions {
+    public static final ConfigOption<LookupCacheType> CACHE_TYPE =
+            ConfigOptions.key("lookup.cache")
+                    .enumType(LookupCacheType.class)
+                    .defaultValue(LookupCacheType.NONE)
+                    .withDescription("The caching strategy for this lookup table");

Review Comment:
   Add description for the available values. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] SmirAlex commented on a diff in pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
SmirAlex commented on code in PR #20196:
URL: https://github.com/apache/flink/pull/20196#discussion_r933430045


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/LookupOptions.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+/** Predefined options for lookup cache. */
+public class LookupOptions {
+    public static final ConfigOption<LookupCacheType> CACHE_TYPE =
+            ConfigOptions.key("lookup.cache")
+                    .enumType(LookupCacheType.class)
+                    .defaultValue(LookupCacheType.NONE)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The caching strategy for this lookup table, including %s, %s and %s",
+                                            code(LookupCacheType.NONE.toString()),
+                                            code(LookupCacheType.PARTIAL.toString()),
+                                            code(LookupCacheType.FULL.toString()))
+                                    .build());
+
+    public static final ConfigOption<Integer> MAX_RETRIES =
+            ConfigOptions.key("lookup.max-retries")
+                    .intType()
+                    .defaultValue(0)

Review Comment:
   Currently in HBase and JDBC connector default value is 3. Maybe leave this value like it was?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/LookupOptions.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+/** Predefined options for lookup cache. */

Review Comment:
   Maybe change 'lookup cache' to 'lookup table' (option 'max-retries' is not related with caching)?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCache.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.clock.Clock;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.PARTIAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link LookupCache}. */
+@PublicEvolving
+public class DefaultLookupCache implements LookupCache {
+    private static final long serialVersionUID = 6839408984141411172L;
+
+    // Configurations of the cache
+    private final Duration expireAfterAccessDuration;
+    private final Duration expireAfterWriteDuration;
+    private final Long maximumSize;
+    private final boolean cacheMissingKey;
+
+    // The underlying Guava cache implementation
+    private transient Cache<RowData, Collection<RowData>> guavaCache;
+
+    // Guava Ticker for testing expiration
+    private transient Ticker ticker;
+
+    // For tracking cache metrics
+    private transient Counter hitCounter;
+    private transient Counter missCounter;
+
+    private DefaultLookupCache(
+            Duration expireAfterAccessDuration,
+            Duration expireAfterWriteDuration,
+            Long maximumSize,
+            boolean cacheMissingKey) {
+        this.expireAfterAccessDuration = expireAfterAccessDuration;
+        this.expireAfterWriteDuration = expireAfterWriteDuration;
+        this.maximumSize = maximumSize;
+        this.cacheMissingKey = cacheMissingKey;
+    }
+
+    /** Creates a builder for the cache. */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static DefaultLookupCache fromConfig(ReadableConfig config) {
+        checkArgument(
+                config.get(CACHE_TYPE).equals(PARTIAL),
+                "'%s' should be '%s' in order to build a default lookup cache",
+                CACHE_TYPE.key(),
+                PARTIAL);
+        return new DefaultLookupCache(
+                config.get(PARTIAL_CACHE_EXPIRE_AFTER_ACCESS),
+                config.get(PARTIAL_CACHE_EXPIRE_AFTER_WRITE),
+                config.get(PARTIAL_CACHE_MAX_ROWS),
+                config.get(PARTIAL_CACHE_CACHE_MISSING_KEY));
+    }
+
+    @Override
+    public void open(CacheMetricGroup metricGroup) {
+        synchronized (this) {
+            // The cache should already been opened if guava cache is not null
+            if (guavaCache != null) {
+                return;
+            }
+            // Initialize Guava cache
+            CacheBuilder<Object, Object> guavaCacheBuilder = CacheBuilder.newBuilder();
+            if (expireAfterAccessDuration != null) {
+                guavaCacheBuilder.expireAfterAccess(expireAfterAccessDuration);
+            }
+            if (expireAfterWriteDuration != null) {
+                guavaCacheBuilder.expireAfterWrite(expireAfterWriteDuration);
+            }
+            if (maximumSize != null) {
+                guavaCacheBuilder.maximumSize(maximumSize);
+            }
+            if (ticker != null) {
+                guavaCacheBuilder.ticker(ticker);
+            }
+            guavaCache = guavaCacheBuilder.build();
+
+            // Initialize and register metrics
+            // Here we can't reuse Guava cache statistics because guavaCache#getIfPresent is not
+            // counted in the stat
+            hitCounter = new ThreadSafeSimpleCounter();
+            missCounter = new ThreadSafeSimpleCounter();
+            metricGroup.hitCounter(hitCounter);
+            metricGroup.missCounter(missCounter);
+            metricGroup.numCachedRecordsGauge(() -> guavaCache.size());
+        }
+    }
+
+    @Nullable
+    @Override
+    public Collection<RowData> getIfPresent(RowData key) {
+        Collection<RowData> value = guavaCache.getIfPresent(key);
+        if (value != null) {
+            hitCounter.inc();
+        } else {
+            missCounter.inc();
+        }
+        return value;
+    }
+
+    @Override
+    public Collection<RowData> put(RowData key, Collection<RowData> value) {
+        checkNotNull(key, "Cannot put an entry with null key into the cache");
+        checkNotNull(value, "Cannot put an entry with null value into the cache");
+        if (!value.isEmpty()) {

Review Comment:
   Maybe simplify this to `if (!value.isEmpty() || cacheMissingKey)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20196: [FLINK-28417][table] Add interfaces for LookupCache and default implementation

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20196:
URL: https://github.com/apache/flink/pull/20196#discussion_r934122365


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCache.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.ThreadSafeSimpleCounter;
+import org.apache.flink.metrics.groups.CacheMetricGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.clock.Clock;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collection;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.PARTIAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Default implementation of {@link LookupCache}. */
+@PublicEvolving
+public class DefaultLookupCache implements LookupCache {
+    private static final long serialVersionUID = 6839408984141411172L;

Review Comment:
   The UID for new classes should start at 1



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/LookupOptions.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+
+/** Predefined options for lookup cache. */
+public class LookupOptions {
+    public static final ConfigOption<LookupCacheType> CACHE_TYPE =
+            ConfigOptions.key("lookup.cache")
+                    .enumType(LookupCacheType.class)
+                    .defaultValue(LookupCacheType.NONE)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The caching strategy for this lookup table, including %s, %s and %s",
+                                            code(LookupCacheType.NONE.toString()),
+                                            code(LookupCacheType.PARTIAL.toString()),
+                                            code(LookupCacheType.FULL.toString()))
+                                    .build());
+
+    public static final ConfigOption<Integer> MAX_RETRIES =

Review Comment:
   It's better not to add this new option if we have no migration plan for existing connectors (e.g., hbase & jdbc),  and it is not related with caching.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org