You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/27 12:03:50 UTC

[GitHub] [flink] SmirAlex opened a new pull request, #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

SmirAlex opened a new pull request, #20382:
URL: https://github.com/apache/flink/pull/20382

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   This pull request introduces new interfaces that allows to use Full cache as `LookupFunctionProvider`. Interface list:
   
   - `FullCachingLookupProvider`
   - `CacheReloadTrigger`
   
   CacheReloadTrigger provides customizable way when to trigger cache reload operation. There are 2 built-in triggers:
   
   - `PeriodicCacheReloadTrigger` - cache is reloaded with fixed intervals without initial delay
   - `TimedCacheReloadTrigger` - cache is reloaded at specified time with fixed intervals multiples of one day
   
   Also there are introduced common options for connectors that will support Full caching lookup join.
   
   
   ## Brief change log
   
   - Add interfaces FullCachingLookupProvider and CacheReloadTrigger
   - Implement reload triggers PeriodicCacheReloadTrigger and TimedCacheReloadTrigger
   - Add common SQL options for Full caching lookup join
   
   
   ## Verifying this change
   
   This change added unit tests for the newly added `PeriodicCacheReloadTrigger` and `TimedCacheReloadTrigger`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (FLIP docs + JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #20382:
URL: https://github.com/apache/flink/pull/20382#discussion_r931858162


##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/lookup/cache/trigger/PeriodicCacheReloadTriggerTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.ScheduledTask;
+import org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger.ScheduleMode;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.Mockito;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.PARTIAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.PERIODIC;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.TIMED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit test for {@link PeriodicCacheReloadTrigger}. */
+class PeriodicCacheReloadTriggerTest {
+
+    private final ScheduleStrategyExecutorService scheduledExecutor =
+            new ScheduleStrategyExecutorService();
+
+    private final Runnable reloadTask = Mockito.mock(Runnable.class);

Review Comment:
   Unfortunately we are not allowed to use Mockito according to the [code style guide](https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-mockito---use-reusable-test-implementations). What about creating a fake Runnable here?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/CacheReloadTrigger.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletableFuture;
+
+/** Customized trigger for reloading lookup table entries. */
+@PublicEvolving
+public interface CacheReloadTrigger extends AutoCloseable, Serializable {
+
+    /** Open the trigger. */
+    void open(Context context) throws Exception;
+
+    /**
+     * Context of {@link CacheReloadTrigger} for getting information about times and triggering
+     * reload.
+     */
+    interface Context {

Review Comment:
   Add @PublicEvolving here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #20382:
URL: https://github.com/apache/flink/pull/20382#discussion_r937426239


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/FullCachingLookupProvider.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+import org.apache.flink.table.functions.LookupFunction;
+
+/**
+ * A {@link LookupFunctionProvider} that never lookup in external system on cache miss and provides
+ * a cache for holding all entries in the external system. The cache will be fully reloaded from the
+ * external system by the {@link ScanTableSource.ScanRuntimeProvider} and reload operations will be
+ * triggered by the {@link CacheReloadTrigger}.
+ */
+@PublicEvolving
+public interface FullCachingLookupProvider extends LookupFunctionProvider {
+    static FullCachingLookupProvider of(

Review Comment:
   Missing JavaDoc for this public API method



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/PeriodicCacheReloadTrigger.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.PERIODIC;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A trigger that reloads cache entries periodically with specified interval and {@link
+ * ScheduleMode}. If {@code reloadInterval} is zero, cache loading will happen just one time.

Review Comment:
   I think this JavaDoc should be updated as the argument check in the constructor has disabled the possibility of using zero as the reload interval.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on a diff in pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on code in PR #20382:
URL: https://github.com/apache/flink/pull/20382#discussion_r936217253


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/PeriodicCacheReloadTrigger.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.PERIODIC;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A trigger that reloads cache entries periodically with specified interval and {@link
+ * ScheduleMode}. If {@code reloadInterval} is zero, cache loading will happen just one time.
+ */
+public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {
+    private static final long serialVersionUID = 3828732577291369913L;

Review Comment:
   The serial version UID should start from 1 for new classes



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/PeriodicCacheReloadTrigger.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.PERIODIC;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A trigger that reloads cache entries periodically with specified interval and {@link
+ * ScheduleMode}. If {@code reloadInterval} is zero, cache loading will happen just one time.
+ */
+public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {
+    private static final long serialVersionUID = 3828732577291369913L;
+
+    private final Duration reloadInterval;
+    private final ScheduleMode scheduleMode;
+
+    private transient ScheduledExecutorService scheduledExecutor;
+
+    public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
+        checkArgument(!reloadInterval.isNegative(), "Reload interval can't be negative.");
+        this.reloadInterval = reloadInterval;
+        this.scheduleMode = scheduleMode;
+    }
+
+    @VisibleForTesting
+    PeriodicCacheReloadTrigger(
+            Duration reloadInterval,
+            ScheduleMode scheduleMode,
+            ScheduledExecutorService scheduledExecutor) {
+        this(reloadInterval, scheduleMode);
+        this.scheduledExecutor = scheduledExecutor;
+    }
+
+    @Override
+    public void open(CacheReloadTrigger.Context context) {
+        if (scheduledExecutor == null) {
+            scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+        }
+        if (reloadInterval.isZero()) {
+            scheduledExecutor.execute(context::triggerReload);
+            return;
+        }
+        switch (scheduleMode) {
+            case FIXED_RATE:
+                scheduledExecutor.scheduleAtFixedRate(
+                        context::triggerReload,
+                        0,
+                        reloadInterval.toMillis(),
+                        TimeUnit.MILLISECONDS);
+                break;
+            case FIXED_DELAY:
+                scheduledExecutor.scheduleWithFixedDelay(
+                        () -> {
+                            try {
+                                context.triggerReload().get();
+                            } catch (Exception e) {
+                                throw new RuntimeException(
+                                        "Uncaught exception during the reload", e);
+                            }
+                        },
+                        0,
+                        reloadInterval.toMillis(),
+                        TimeUnit.MILLISECONDS);
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unrecognized schedule mode \"%s\"", scheduleMode));
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        scheduledExecutor.shutdownNow();

Review Comment:
   What about wrapping with a `if(scheduledExecutor != null)`?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/TimedCacheReloadTrigger.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.LocalTime;
+import java.time.OffsetTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.Temporal;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.TIMED;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A trigger that reloads at a specific time and repeats for the given interval in days. */
+public class TimedCacheReloadTrigger implements CacheReloadTrigger {
+    private static final long serialVersionUID = -8565574480311775185L;

Review Comment:
   The serial version UID should start from 1 for new classes



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/TimedCacheReloadTrigger.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.LocalTime;
+import java.time.OffsetTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.Temporal;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.TIMED;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A trigger that reloads at a specific time and repeats for the given interval in days. */
+public class TimedCacheReloadTrigger implements CacheReloadTrigger {
+    private static final long serialVersionUID = -8565574480311775185L;
+
+    private final Temporal reloadTime;
+    private final int reloadIntervalInDays;
+
+    private transient ScheduledExecutorService scheduledExecutor;
+    private transient Clock clock; // clock for testing purposes
+
+    public TimedCacheReloadTrigger(OffsetTime reloadTime, int reloadIntervalInDays) {
+        this((Temporal) reloadTime, reloadIntervalInDays);
+    }
+
+    public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) {
+        this((Temporal) reloadTime, reloadIntervalInDays);
+    }
+
+    private TimedCacheReloadTrigger(Temporal reloadTime, int reloadIntervalInDays) {
+        checkArgument(
+                reloadIntervalInDays > 0,
+                "Reload interval for Timed cache reload trigger must be at least 1 day.");
+        this.reloadTime = reloadTime;
+        this.reloadIntervalInDays = reloadIntervalInDays;
+    }
+
+    @VisibleForTesting
+    TimedCacheReloadTrigger(
+            Temporal reloadTime,
+            int reloadIntervalInDays,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock) {
+        this(reloadTime, reloadIntervalInDays);
+        this.scheduledExecutor = scheduledExecutor;
+        this.clock = clock;
+    }
+
+    @Override
+    public void open(Context context) {
+        if (scheduledExecutor == null) {
+            scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+        }
+        if (clock == null) {
+            clock =
+                    reloadTime instanceof LocalTime
+                            ? Clock.systemDefaultZone()
+                            : Clock.system(((OffsetTime) reloadTime).getOffset());
+        }
+        Temporal now =
+                reloadTime instanceof LocalTime ? LocalTime.now(clock) : OffsetTime.now(clock);
+
+        Duration initialDelayDuration = Duration.between(now, reloadTime);
+        if (initialDelayDuration.isNegative()) {
+            // in case when reloadTime is less than current time, reload will happen next day
+            initialDelayDuration = initialDelayDuration.plus(1, ChronoUnit.DAYS);
+        }
+        scheduledExecutor.execute(context::triggerReload); // trigger first load operation
+        scheduledExecutor.scheduleAtFixedRate(
+                context::triggerReload,
+                initialDelayDuration.toMillis(),
+                Duration.ofDays(reloadIntervalInDays).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() throws Exception {
+        scheduledExecutor.shutdownNow();

Review Comment:
   What about wrapping with a if(scheduledExecutor != null)?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/LookupOptions.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+import org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger;
+
+import java.time.Duration;
+
+/** Predefined options for lookup join. */
+public class LookupOptions {
+    public static final ConfigOption<LookupCacheType> CACHE_TYPE =
+            ConfigOptions.key("lookup.cache")
+                    .enumType(LookupCacheType.class)
+                    .defaultValue(LookupCacheType.NONE)
+                    .withDescription("The caching strategy for this lookup table");
+
+    public static final ConfigOption<ReloadStrategy> FULL_CACHE_RELOAD_STRATEGY =
+            ConfigOptions.key("lookup.full-cache.reload-strategy")
+                    .enumType(ReloadStrategy.class)
+                    .defaultValue(ReloadStrategy.PERIODIC)
+                    .withDescription(
+                            "Defines which strategy to use to reload full cache: "
+                                    + "PERIODIC - cache is reloaded with fixed intervals without initial delay; "
+                                    + "TIMED - cache is reloaded at specified time with fixed intervals multiples of one day.");
+
+    public static final ConfigOption<Duration> FULL_CACHE_PERIODIC_RELOAD_INTERVAL =
+            ConfigOptions.key("lookup.full-cache.periodic-reload.interval")
+                    .durationType()
+                    .defaultValue(Duration.ZERO)

Review Comment:
   I'm not sure if it is a good idea to have a default value with 0 here. If users intend to reload the cache periodically, the interval should be set in the configuration to fully express the reloading strategy. Maybe we could just leave it without default value here and have a check in the `PeriodicCacheReloadTrigger`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on PR #20382:
URL: https://github.com/apache/flink/pull/20382#issuecomment-1205370351

   Merged on master 5405239dec0884dff746129c73955c90f455c465


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] SmirAlex commented on pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
SmirAlex commented on PR #20382:
URL: https://github.com/apache/flink/pull/20382#issuecomment-1199279288

   @PatrickRen  Thanks for the review! I've made fixes according to your comments. Please look at the latest commit, when you will have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] SmirAlex commented on a diff in pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
SmirAlex commented on code in PR #20382:
URL: https://github.com/apache/flink/pull/20382#discussion_r933252511


##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/lookup/cache/trigger/PeriodicCacheReloadTriggerTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.ScheduledTask;
+import org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger.ScheduleMode;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.Mockito;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.PARTIAL;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.PERIODIC;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.TIMED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit test for {@link PeriodicCacheReloadTrigger}. */
+class PeriodicCacheReloadTriggerTest {
+
+    private final ScheduleStrategyExecutorService scheduledExecutor =
+            new ScheduleStrategyExecutorService();
+
+    private final Runnable reloadTask = Mockito.mock(Runnable.class);

Review Comment:
   Yea, I just thought this little one would be ok, because basically we don't need any implementation of this Runnable, except counting calls of one method. Anyway I fixed that, no problem.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/CacheReloadTrigger.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletableFuture;
+
+/** Customized trigger for reloading lookup table entries. */
+@PublicEvolving
+public interface CacheReloadTrigger extends AutoCloseable, Serializable {
+
+    /** Open the trigger. */
+    void open(Context context) throws Exception;
+
+    /**
+     * Context of {@link CacheReloadTrigger} for getting information about times and triggering
+     * reload.
+     */
+    interface Context {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] SmirAlex commented on a diff in pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
SmirAlex commented on code in PR #20382:
URL: https://github.com/apache/flink/pull/20382#discussion_r936269235


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/LookupOptions.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+import org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger;
+
+import java.time.Duration;
+
+/** Predefined options for lookup join. */
+public class LookupOptions {
+    public static final ConfigOption<LookupCacheType> CACHE_TYPE =
+            ConfigOptions.key("lookup.cache")
+                    .enumType(LookupCacheType.class)
+                    .defaultValue(LookupCacheType.NONE)
+                    .withDescription("The caching strategy for this lookup table");
+
+    public static final ConfigOption<ReloadStrategy> FULL_CACHE_RELOAD_STRATEGY =
+            ConfigOptions.key("lookup.full-cache.reload-strategy")
+                    .enumType(ReloadStrategy.class)
+                    .defaultValue(ReloadStrategy.PERIODIC)
+                    .withDescription(
+                            "Defines which strategy to use to reload full cache: "
+                                    + "PERIODIC - cache is reloaded with fixed intervals without initial delay; "
+                                    + "TIMED - cache is reloaded at specified time with fixed intervals multiples of one day.");
+
+    public static final ConfigOption<Duration> FULL_CACHE_PERIODIC_RELOAD_INTERVAL =
+            ConfigOptions.key("lookup.full-cache.periodic-reload.interval")
+                    .durationType()
+                    .defaultValue(Duration.ZERO)

Review Comment:
   Yes, I agree. If partial cache can't be built without specifying options for it, the same should be for full cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] SmirAlex commented on pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
SmirAlex commented on PR #20382:
URL: https://github.com/apache/flink/pull/20382#issuecomment-1204875994

   @PatrickRen thanks for the review! I've fixed last comments about javadoc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers [flink]

Posted by "xiaolan-bit (via GitHub)" <gi...@apache.org>.
xiaolan-bit commented on PR #20382:
URL: https://github.com/apache/flink/pull/20382#issuecomment-1846896369

   I would like to ask, as I have a scenario where Flink writes into Hudi. Due to the complexity of the SQL process, I want to turn the whole process into an SQL file. However, due to the latency of the insert operation, I want to delay the operation through Flink SQL statements. Is there such a statement that can implement the delay operation, for example, execute the next SQL statement only after 5 seconds?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20382:
URL: https://github.com/apache/flink/pull/20382#issuecomment-1196645761

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2eb7354fc2e2966fedfc9e5a186329f7a65fd3a0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2eb7354fc2e2966fedfc9e5a186329f7a65fd3a0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2eb7354fc2e2966fedfc9e5a186329f7a65fd3a0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen commented on pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
PatrickRen commented on PR #20382:
URL: https://github.com/apache/flink/pull/20382#issuecomment-1203535088

   @SmirAlex Yeah we have to wait for a green CI run. It should be running now so let's wait for a while.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] SmirAlex commented on pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
SmirAlex commented on PR #20382:
URL: https://github.com/apache/flink/pull/20382#issuecomment-1204717241

   @PatrickRen There were multiple failed pipelines, but they were because of unrelated issues. Latest pipeline was successful - https://dev.azure.com/asmirnov1/Flink/_build/results?buildId=17&view=results


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] SmirAlex commented on pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
SmirAlex commented on PR #20382:
URL: https://github.com/apache/flink/pull/20382#issuecomment-1204035787

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] PatrickRen closed pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
PatrickRen closed pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers
URL: https://github.com/apache/flink/pull/20382


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] SmirAlex commented on pull request #20382: [FLINK-28419][table] Add runtime provider interface for full caching lookup + implement Periodic and Timed cache reload triggers

Posted by GitBox <gi...@apache.org>.
SmirAlex commented on PR #20382:
URL: https://github.com/apache/flink/pull/20382#issuecomment-1203527747

   @PatrickRen I resolved your comments and rebased. Additionally I changed error messages and config descriptions, so they now correspond to the ones for partial caching. You can have a look. Do I need to run CI again before merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org