You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/09/23 14:29:51 UTC

[flink] branch release-1.16 updated: [FLINK-29093][table] Fix InternalCompilerException in LookupJoinITCase + reset resource counter before each test

This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new c10a7279906 [FLINK-29093][table] Fix InternalCompilerException in LookupJoinITCase + reset resource counter before each test
c10a7279906 is described below

commit c10a727990668b1a0d706f16e4f5220780422ed9
Author: Smirnov Alexander <sm...@gmail.com>
AuthorDate: Mon Sep 5 16:07:57 2022 +0700

    [FLINK-29093][table] Fix InternalCompilerException in LookupJoinITCase + reset resource counter before each test
---
 .../runtime/batch/sql/join/LookupJoinITCase.scala  |  8 +++
 .../runtime/stream/sql/AsyncLookupJoinITCase.scala |  5 ++
 .../runtime/stream/sql/LookupJoinITCase.scala      |  8 +++
 .../table/lookup/fullcache/CacheLoader.java        | 18 ++++-
 .../table/lookup/fullcache/LookupFullCache.java    |  2 +-
 .../inputformat/InputFormatCacheLoader.java        | 50 ++++++-------
 .../inputformat/InputSplitCacheLoadTask.java       |  9 +--
 .../keyselector/GenericRowDataKeySelector.java     | 10 ++-
 .../table/fullcache/LookupFullCacheTest.java       | 12 ++--
 .../functions/table/fullcache/TestCacheLoader.java | 11 +--
 .../inputformat/FullCacheTestInputFormat.java      |  6 +-
 .../inputformat/InputFormatCacheLoaderTest.java    | 81 +++++++++++++++++++---
 12 files changed, 158 insertions(+), 62 deletions(-)

diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
index 219c6bbef12..9e7587fde38 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.GenericRowData
 import org.apache.flink.table.data.binary.BinaryStringData
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, InMemoryLookupableTableSource}
+import org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat
 import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager
 import org.apache.flink.types.Row
 
@@ -67,6 +68,12 @@ class LookupJoinITCase(legacyTableSource: Boolean, isAsyncMode: Boolean, cacheTy
   @Before
   override def before() {
     super.before()
+    if (legacyTableSource) {
+      InMemoryLookupableTableSource.RESOURCE_COUNTER.set(0)
+    } else {
+      TestValuesTableFactory.RESOURCE_COUNTER.set(0)
+      FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.set(0)
+    }
     createScanTable("T", data)
     createScanTable("nullableT", dataWithNull)
 
@@ -84,6 +91,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, isAsyncMode: Boolean, cacheTy
       assertEquals(0, InMemoryLookupableTableSource.RESOURCE_COUNTER.get())
     } else {
       assertEquals(0, TestValuesTableFactory.RESOURCE_COUNTER.get())
+      assertEquals(0, FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.get())
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
index ecf8aab94fe..864c01527bd 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -66,6 +66,11 @@ class AsyncLookupJoinITCase(
   @Before
   override def before(): Unit = {
     super.before()
+    if (legacyTableSource) {
+      InMemoryLookupableTableSource.RESOURCE_COUNTER.set(0)
+    } else {
+      TestValuesTableFactory.RESOURCE_COUNTER.set(0)
+    }
     if (objectReuse) {
       env.getConfig.enableObjectReuse()
     } else {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index edea5d48915..39efec4a146 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.binary.BinaryStringData
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource, StreamingTestBase, TestingAppendSink, TestingRetractSink}
 import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.TestAddWithOpen
+import org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat
 import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager
 import org.apache.flink.types.Row
 
@@ -77,6 +78,12 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
   @Before
   override def before(): Unit = {
     super.before()
+    if (legacyTableSource) {
+      InMemoryLookupableTableSource.RESOURCE_COUNTER.set(0)
+    } else {
+      TestValuesTableFactory.RESOURCE_COUNTER.set(0)
+      FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.set(0)
+    }
     createScanTable("src", data)
     createScanTable("nullable_src", dataWithNull)
     createLookupTable("user_table", userData)
@@ -94,6 +101,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
       assertEquals(0, InMemoryLookupableTableSource.RESOURCE_COUNTER.get())
     } else {
       assertEquals(0, TestValuesTableFactory.RESOURCE_COUNTER.get())
+      assertEquals(0, FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.get())
     }
   }
 
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
index 6f6468c5a83..5b29a772bde 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
@@ -57,6 +57,8 @@ public abstract class CacheLoader extends AbstractRichFunction implements Runnab
     private transient Counter loadFailuresCounter;
     private transient volatile long latestLoadTimeMs = UNINITIALIZED;
 
+    protected volatile boolean isStopped;
+
     protected abstract void reloadCache() throws Exception;
 
     @Override
@@ -92,6 +94,9 @@ public abstract class CacheLoader extends AbstractRichFunction implements Runnab
 
     @Override
     public void run() {
+        if (isStopped) {
+            return;
+        }
         // 2 reloads can't be executed simultaneously
         reloadLock.lock();
         try {
@@ -112,6 +117,7 @@ public abstract class CacheLoader extends AbstractRichFunction implements Runnab
             }
         } catch (Exception e) {
             loadFailuresCounter.inc();
+            isStopped = true;
             throw new RuntimeException("Failed to reload lookup 'FULL' cache.", e);
         } finally {
             reloadLock.unlock();
@@ -121,8 +127,16 @@ public abstract class CacheLoader extends AbstractRichFunction implements Runnab
 
     @Override
     public void close() throws Exception {
-        if (cache != null) {
-            cache.clear();
+        isStopped = true;
+        // if reload is in progress, we will wait until it is over
+        // current reload should already be interrupted, so block won't take much time
+        reloadLock.lock();
+        try {
+            if (cache != null) {
+                cache.clear();
+            }
+        } finally {
+            reloadLock.unlock();
         }
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
index d3eac31cc4c..20d317d4bcd 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
@@ -109,7 +109,7 @@ public class LookupFullCache implements LookupCache {
 
     @Override
     public void close() throws Exception {
+        reloadTrigger.close(); // firstly try to interrupt reload thread
         cacheLoader.close();
-        reloadTrigger.close();
     }
 }
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
index f7b95dbd694..872929111f2 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
@@ -32,7 +32,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -54,8 +53,6 @@ public class InputFormatCacheLoader extends CacheLoader {
     private transient volatile List<InputSplitCacheLoadTask> cacheLoadTasks;
     private transient Configuration parameters;
 
-    private volatile boolean isStopped;
-
     public InputFormatCacheLoader(
             InputFormat<RowData, ?> initialInputFormat,
             GenericRowDataKeySelector keySelector,
@@ -86,39 +83,44 @@ public class InputFormatCacheLoader extends CacheLoader {
                 Arrays.stream(inputSplits)
                         .map(split -> createCacheLoadTask(split, newCache))
                         .collect(Collectors.toList());
-
-        // run first task and start numTasks - 1 threads to run remaining tasks
+        if (isStopped) {
+            // check for cases when #close was called during reload before creating cacheLoadTasks
+            return;
+        }
+        // run first task or create numSplits threads to run all tasks
         ExecutorService cacheLoadTaskService = null;
-        List<Future<?>> futures = null;
-        if (numSplits > 1) {
-            futures = new ArrayList<>();
-            int numThreads = getConcurrencyLevel(numSplits) - 1;
-            cacheLoadTaskService = Executors.newFixedThreadPool(numThreads);
-            for (int i = 1; i < numSplits; i++) {
-                Future<?> future = cacheLoadTaskService.submit(cacheLoadTasks.get(i));
-                futures.add(future);
+        try {
+            if (numSplits > 1) {
+                int numThreads = getConcurrencyLevel(numSplits);
+                cacheLoadTaskService = Executors.newFixedThreadPool(numThreads);
+                ExecutorService finalCacheLoadTaskService = cacheLoadTaskService;
+                List<Future<?>> futures =
+                        cacheLoadTasks.stream()
+                                .map(finalCacheLoadTaskService::submit)
+                                .collect(Collectors.toList());
+                for (Future<?> future : futures) {
+                    future.get(); // if any exception occurs it will be thrown here
+                }
+            } else {
+                cacheLoadTasks.get(0).run();
             }
-        }
-        cacheLoadTasks.get(0).run();
-        if (cacheLoadTaskService != null) {
-            for (Future<?> future : futures) {
-                future.get(); // if any exception occurs it will be thrown here
+        } catch (InterruptedException ignored) { // we use interrupt to close reload thread
+        } finally {
+            if (cacheLoadTaskService != null) {
+                cacheLoadTaskService.shutdownNow();
             }
-            cacheLoadTaskService.shutdownNow();
-        }
-        if (!isStopped) {
-            // reassigning cache field is safe, because it's volatile
-            cache = newCache;
         }
+        cache = newCache; // reassigning cache field is safe, because it's volatile
     }
 
     @Override
     public void close() throws Exception {
-        super.close();
+        // firstly stop current reload in case when custom reloadTrigger didn't interrupt it
         isStopped = true;
         if (cacheLoadTasks != null) {
             cacheLoadTasks.forEach(InputSplitCacheLoadTask::stopRunning);
         }
+        super.close();
     }
 
     private InputSplitCacheLoadTask createCacheLoadTask(
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
index 8610b206fff..3d28a844446 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.types.RowKind;
 
@@ -39,7 +39,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 public class InputSplitCacheLoadTask implements Runnable {
 
     private final ConcurrentHashMap<RowData, Collection<RowData>> cache;
-    private final RowDataKeySelector keySelector;
+    private final GenericRowDataKeySelector keySelector;
     private final RowDataSerializer cacheEntriesSerializer;
     private final InputFormat<RowData, InputSplit> inputFormat;
     private final InputSplit inputSplit;
@@ -48,7 +48,7 @@ public class InputSplitCacheLoadTask implements Runnable {
 
     public InputSplitCacheLoadTask(
             ConcurrentHashMap<RowData, Collection<RowData>> cache,
-            RowDataKeySelector keySelector,
+            GenericRowDataKeySelector keySelector,
             RowDataSerializer cacheEntriesSerializer,
             InputFormat<RowData, InputSplit> inputFormat,
             InputSplit inputSplit) {
@@ -57,6 +57,7 @@ public class InputSplitCacheLoadTask implements Runnable {
         this.inputFormat = inputFormat;
         this.cacheEntriesSerializer = cacheEntriesSerializer;
         this.inputSplit = inputSplit;
+        keySelector.open();
     }
 
     @Override
@@ -67,7 +68,7 @@ public class InputSplitCacheLoadTask implements Runnable {
             }
             inputFormat.open(inputSplit);
             RowData nextElement = new BinaryRowData(cacheEntriesSerializer.getArity());
-            while (isRunning && !inputFormat.reachedEnd()) {
+            while (isRunning && !inputFormat.reachedEnd() && !Thread.interrupted()) {
                 nextElement = inputFormat.nextRecord(nextElement);
                 if (nextElement != null) {
                     if (nextElement.getRowKind() != RowKind.INSERT) {
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
index 13f01fd7d76..50dfa071c17 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
@@ -44,12 +44,16 @@ public class GenericRowDataKeySelector implements RowDataKeySelector {
         this.keySerializer = keySerializer;
     }
 
+    public void open() {
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        //noinspection unchecked
+        projection = generatedProjection.newInstance(cl);
+    }
+
     @Override
     public RowData getKey(RowData value) throws Exception {
         if (projection == null) {
-            ClassLoader cl = Thread.currentThread().getContextClassLoader();
-            //noinspection unchecked
-            projection = generatedProjection.newInstance(cl);
+            open();
         }
         return keySerializer.copy(projection.apply(value));
     }
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/LookupFullCacheTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/LookupFullCacheTest.java
index 4830383c2d1..8fa1a2d747a 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/LookupFullCacheTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/LookupFullCacheTest.java
@@ -62,7 +62,7 @@ public class LookupFullCacheTest {
             assertThat(result).isEqualTo(newResult);
         }
         assertThat(reloadTrigger.isClosed()).isTrue();
-        assertThat(cacheLoader.isClosed()).isTrue();
+        assertThat(cacheLoader.isStopped()).isTrue();
     }
 
     @Test
@@ -79,7 +79,7 @@ public class LookupFullCacheTest {
             assertThat(result).isEqualTo(newResult);
         }
         assertThat(reloadTrigger.isClosed()).isTrue();
-        assertThat(cacheLoader.isClosed()).isTrue();
+        assertThat(cacheLoader.isStopped()).isTrue();
     }
 
     @Test
@@ -96,7 +96,7 @@ public class LookupFullCacheTest {
             assertThat(result.size()).isEqualTo(0);
         }
         assertThat(reloadTrigger.isClosed()).isTrue();
-        assertThat(cacheLoader.isClosed()).isTrue();
+        assertThat(cacheLoader.isStopped()).isTrue();
     }
 
     @Test
@@ -109,10 +109,14 @@ public class LookupFullCacheTest {
                         });
         try (LookupFullCache fullCache = createAndLoadCache(cacheLoader)) {
             reloadTrigger.trigger();
+            assertThat(cacheLoader.isStopped()).isTrue();
+            assertThat(cacheLoader.getNumLoads()).isEqualTo(2);
             assertThatThrownBy(() -> fullCache.getIfPresent(row(1))).hasRootCause(exception);
+            reloadTrigger.trigger();
+            assertThat(cacheLoader.getNumLoads()).isEqualTo(2); // no reload after fail
         }
         assertThat(reloadTrigger.isClosed()).isTrue();
-        assertThat(cacheLoader.isClosed()).isTrue();
+        assertThat(cacheLoader.isStopped()).isTrue();
     }
 
     @Test
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestCacheLoader.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestCacheLoader.java
index 859c13e59c3..53077937467 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestCacheLoader.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestCacheLoader.java
@@ -44,7 +44,6 @@ public class TestCacheLoader extends CacheLoader {
 
     private final Consumer<Map<RowData, Collection<RowData>>> secondLoadDataChange;
     private int numLoads;
-    private boolean isClosed;
     private boolean isAwaitTriggered;
 
     public TestCacheLoader(Consumer<Map<RowData, Collection<RowData>>> secondLoadDataChange) {
@@ -55,8 +54,8 @@ public class TestCacheLoader extends CacheLoader {
         return numLoads;
     }
 
-    public boolean isClosed() {
-        return isClosed;
+    public boolean isStopped() {
+        return isStopped;
     }
 
     public boolean isAwaitTriggered() {
@@ -76,10 +75,4 @@ public class TestCacheLoader extends CacheLoader {
             secondLoadDataChange.accept(cache);
         }
     }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-        isClosed = true;
-    }
 }
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java
index 821f3b92cc1..fef9eac4123 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java
@@ -45,8 +45,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class FullCacheTestInputFormat
         extends RichInputFormat<RowData, FullCacheTestInputFormat.QueueInputSplit> {
 
+    public static final AtomicInteger OPEN_CLOSED_COUNTER = new AtomicInteger(0);
     private static final int DEFAULT_NUM_SPLITS = 2;
-    private static final AtomicInteger OPEN_CLOSED_COUNTER = new AtomicInteger(0);
 
     // RowData is not serializable, so we store Rows
     private final Collection<Row> dataRows;
@@ -164,10 +164,6 @@ public class FullCacheTestInputFormat
         OPEN_CLOSED_COUNTER.decrementAndGet();
     }
 
-    public boolean isClosed() {
-        return OPEN_CLOSED_COUNTER.get() == 0;
-    }
-
     /** {@link InputSplit} that provides queue to {@link InputFormat}. */
     public static class QueueInputSplit implements InputSplit {
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java
index a9271d3ddca..dcb73ae0346 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java
@@ -35,7 +35,10 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.function.ThrowingRunnable;
 
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -45,21 +48,36 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup.UNINITIALIZED;
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Unit test for {@link InputFormatCacheLoader}. */
 class InputFormatCacheLoaderTest {
 
+    @BeforeEach
+    void resetCounter() {
+        FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.set(0);
+    }
+
+    @AfterEach
+    void checkCounter() {
+        assertThat(FullCacheTestInputFormat.OPEN_CLOSED_COUNTER).hasValue(0);
+    }
+
     @ParameterizedTest
     @MethodSource("deltaNumSplits")
     void testReadWithDifferentSplits(int deltaNumSplits) throws Exception {
-        InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits, null);
+        InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits);
         cacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup());
         cacheLoader.run();
         ConcurrentHashMap<RowData, Collection<RowData>> cache = cacheLoader.getCache();
@@ -72,7 +90,7 @@ class InputFormatCacheLoaderTest {
 
     @Test
     void testCacheMetrics() throws Exception {
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, null);
+        InputFormatCacheLoader cacheLoader = createCacheLoader(0);
         InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
         cacheLoader.open(metricGroup);
         // These metrics are registered
@@ -99,15 +117,54 @@ class InputFormatCacheLoaderTest {
     }
 
     @Test
-    void testExceptionHandling() throws Exception {
+    void testExceptionDuringReload() throws Exception {
         RuntimeException exception = new RuntimeException("Load failed.");
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, exception);
+        Runnable reloadAction =
+                () -> {
+                    throw exception;
+                };
+        InputFormatCacheLoader cacheLoader = createCacheLoader(0, reloadAction);
         InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
         cacheLoader.open(metricGroup);
         assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
         assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
     }
 
+    @Test
+    void testCloseAndInterruptDuringReload() throws Exception {
+        AtomicInteger sleepCounter = new AtomicInteger(0);
+        int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to number of all rows
+        Runnable reloadAction =
+                ThrowingRunnable.unchecked(
+                        () -> {
+                            sleepCounter.incrementAndGet();
+                            Thread.sleep(1000);
+                        });
+        InputFormatCacheLoader cacheLoader = createCacheLoader(0, reloadAction);
+        InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
+        cacheLoader.open(metricGroup);
+
+        // check interruption
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        Future<?> future = executorService.submit(cacheLoader);
+        executorService.shutdownNow(); // internally interrupts a thread
+        assertThatNoException().isThrownBy(future::get); // wait for the end
+        // check that we didn't process all elements, but reacted on interruption
+        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+        assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+
+        sleepCounter.set(0);
+
+        // check closing
+        executorService = Executors.newSingleThreadExecutor();
+        future = executorService.submit(cacheLoader);
+        cacheLoader.close();
+        assertThatNoException().isThrownBy(future::get); // wait for the end
+        // check that we didn't process all elements, but reacted on closing
+        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+        assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+    }
+
     static Stream<Arguments> deltaNumSplits() {
         return Stream.of(Arguments.of(-1), Arguments.of(0), Arguments.of(1));
     }
@@ -121,8 +178,12 @@ class InputFormatCacheLoaderTest {
                         assertThat(rows).containsExactlyInAnyOrderElementsOf(actual.get(key)));
     }
 
-    private InputFormatCacheLoader createCacheLoader(
-            int deltaNumSplits, RuntimeException testException) throws Exception {
+    private InputFormatCacheLoader createCacheLoader(int deltaNumSplits) throws Exception {
+        return createCacheLoader(deltaNumSplits, () -> {});
+    }
+
+    private InputFormatCacheLoader createCacheLoader(int deltaNumSplits, Runnable reloadAction)
+            throws Exception {
         DataType rightRowDataType =
                 DataTypes.ROW(
                         DataTypes.FIELD("f0", DataTypes.INT()),
@@ -150,10 +211,10 @@ class InputFormatCacheLoaderTest {
                 new GeneratedProjection("", "", new Object[0]) {
                     @Override
                     public Projection newInstance(ClassLoader classLoader) {
-                        if (testException != null) {
-                            throw testException;
-                        }
-                        return row -> row(row.getInt(0));
+                        return row -> {
+                            reloadAction.run();
+                            return row(row.getInt(0));
+                        };
                     }
                 };
         GenericRowDataKeySelector keySelector =