You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/09/23 14:29:51 UTC
[flink] branch release-1.16 updated: [FLINK-29093][table] Fix InternalCompilerException in LookupJoinITCase + reset resource counter before each test
This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new c10a7279906 [FLINK-29093][table] Fix InternalCompilerException in LookupJoinITCase + reset resource counter before each test
c10a7279906 is described below
commit c10a727990668b1a0d706f16e4f5220780422ed9
Author: Smirnov Alexander <sm...@gmail.com>
AuthorDate: Mon Sep 5 16:07:57 2022 +0700
[FLINK-29093][table] Fix InternalCompilerException in LookupJoinITCase + reset resource counter before each test
---
.../runtime/batch/sql/join/LookupJoinITCase.scala | 8 +++
.../runtime/stream/sql/AsyncLookupJoinITCase.scala | 5 ++
.../runtime/stream/sql/LookupJoinITCase.scala | 8 +++
.../table/lookup/fullcache/CacheLoader.java | 18 ++++-
.../table/lookup/fullcache/LookupFullCache.java | 2 +-
.../inputformat/InputFormatCacheLoader.java | 50 ++++++-------
.../inputformat/InputSplitCacheLoadTask.java | 9 +--
.../keyselector/GenericRowDataKeySelector.java | 10 ++-
.../table/fullcache/LookupFullCacheTest.java | 12 ++--
.../functions/table/fullcache/TestCacheLoader.java | 11 +--
.../inputformat/FullCacheTestInputFormat.java | 6 +-
.../inputformat/InputFormatCacheLoaderTest.java | 81 +++++++++++++++++++---
12 files changed, 158 insertions(+), 62 deletions(-)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
index 219c6bbef12..9e7587fde38 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.GenericRowData
import org.apache.flink.table.data.binary.BinaryStringData
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.{BatchTestBase, InMemoryLookupableTableSource}
+import org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat
import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager
import org.apache.flink.types.Row
@@ -67,6 +68,12 @@ class LookupJoinITCase(legacyTableSource: Boolean, isAsyncMode: Boolean, cacheTy
@Before
override def before() {
super.before()
+ if (legacyTableSource) {
+ InMemoryLookupableTableSource.RESOURCE_COUNTER.set(0)
+ } else {
+ TestValuesTableFactory.RESOURCE_COUNTER.set(0)
+ FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.set(0)
+ }
createScanTable("T", data)
createScanTable("nullableT", dataWithNull)
@@ -84,6 +91,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, isAsyncMode: Boolean, cacheTy
assertEquals(0, InMemoryLookupableTableSource.RESOURCE_COUNTER.get())
} else {
assertEquals(0, TestValuesTableFactory.RESOURCE_COUNTER.get())
+ assertEquals(0, FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.get())
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
index ecf8aab94fe..864c01527bd 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -66,6 +66,11 @@ class AsyncLookupJoinITCase(
@Before
override def before(): Unit = {
super.before()
+ if (legacyTableSource) {
+ InMemoryLookupableTableSource.RESOURCE_COUNTER.set(0)
+ } else {
+ TestValuesTableFactory.RESOURCE_COUNTER.set(0)
+ }
if (objectReuse) {
env.getConfig.enableObjectReuse()
} else {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index edea5d48915..39efec4a146 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.binary.BinaryStringData
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource, StreamingTestBase, TestingAppendSink, TestingRetractSink}
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.TestAddWithOpen
+import org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat
import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager
import org.apache.flink.types.Row
@@ -77,6 +78,12 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
@Before
override def before(): Unit = {
super.before()
+ if (legacyTableSource) {
+ InMemoryLookupableTableSource.RESOURCE_COUNTER.set(0)
+ } else {
+ TestValuesTableFactory.RESOURCE_COUNTER.set(0)
+ FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.set(0)
+ }
createScanTable("src", data)
createScanTable("nullable_src", dataWithNull)
createLookupTable("user_table", userData)
@@ -94,6 +101,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
assertEquals(0, InMemoryLookupableTableSource.RESOURCE_COUNTER.get())
} else {
assertEquals(0, TestValuesTableFactory.RESOURCE_COUNTER.get())
+ assertEquals(0, FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.get())
}
}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
index 6f6468c5a83..5b29a772bde 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java
@@ -57,6 +57,8 @@ public abstract class CacheLoader extends AbstractRichFunction implements Runnab
private transient Counter loadFailuresCounter;
private transient volatile long latestLoadTimeMs = UNINITIALIZED;
+ protected volatile boolean isStopped;
+
protected abstract void reloadCache() throws Exception;
@Override
@@ -92,6 +94,9 @@ public abstract class CacheLoader extends AbstractRichFunction implements Runnab
@Override
public void run() {
+ if (isStopped) {
+ return;
+ }
// 2 reloads can't be executed simultaneously
reloadLock.lock();
try {
@@ -112,6 +117,7 @@ public abstract class CacheLoader extends AbstractRichFunction implements Runnab
}
} catch (Exception e) {
loadFailuresCounter.inc();
+ isStopped = true;
throw new RuntimeException("Failed to reload lookup 'FULL' cache.", e);
} finally {
reloadLock.unlock();
@@ -121,8 +127,16 @@ public abstract class CacheLoader extends AbstractRichFunction implements Runnab
@Override
public void close() throws Exception {
- if (cache != null) {
- cache.clear();
+ isStopped = true;
+ // if reload is in progress, we will wait until it is over
+ // current reload should already be interrupted, so block won't take much time
+ reloadLock.lock();
+ try {
+ if (cache != null) {
+ cache.clear();
+ }
+ } finally {
+ reloadLock.unlock();
}
}
}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
index d3eac31cc4c..20d317d4bcd 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/LookupFullCache.java
@@ -109,7 +109,7 @@ public class LookupFullCache implements LookupCache {
@Override
public void close() throws Exception {
+ reloadTrigger.close(); // firstly try to interrupt reload thread
cacheLoader.close();
- reloadTrigger.close();
}
}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
index f7b95dbd694..872929111f2 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java
@@ -32,7 +32,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -54,8 +53,6 @@ public class InputFormatCacheLoader extends CacheLoader {
private transient volatile List<InputSplitCacheLoadTask> cacheLoadTasks;
private transient Configuration parameters;
- private volatile boolean isStopped;
-
public InputFormatCacheLoader(
InputFormat<RowData, ?> initialInputFormat,
GenericRowDataKeySelector keySelector,
@@ -86,39 +83,44 @@ public class InputFormatCacheLoader extends CacheLoader {
Arrays.stream(inputSplits)
.map(split -> createCacheLoadTask(split, newCache))
.collect(Collectors.toList());
-
- // run first task and start numTasks - 1 threads to run remaining tasks
+ if (isStopped) {
+ // check for cases when #close was called during reload before creating cacheLoadTasks
+ return;
+ }
+ // run first task or create numSplits threads to run all tasks
ExecutorService cacheLoadTaskService = null;
- List<Future<?>> futures = null;
- if (numSplits > 1) {
- futures = new ArrayList<>();
- int numThreads = getConcurrencyLevel(numSplits) - 1;
- cacheLoadTaskService = Executors.newFixedThreadPool(numThreads);
- for (int i = 1; i < numSplits; i++) {
- Future<?> future = cacheLoadTaskService.submit(cacheLoadTasks.get(i));
- futures.add(future);
+ try {
+ if (numSplits > 1) {
+ int numThreads = getConcurrencyLevel(numSplits);
+ cacheLoadTaskService = Executors.newFixedThreadPool(numThreads);
+ ExecutorService finalCacheLoadTaskService = cacheLoadTaskService;
+ List<Future<?>> futures =
+ cacheLoadTasks.stream()
+ .map(finalCacheLoadTaskService::submit)
+ .collect(Collectors.toList());
+ for (Future<?> future : futures) {
+ future.get(); // if any exception occurs it will be thrown here
+ }
+ } else {
+ cacheLoadTasks.get(0).run();
}
- }
- cacheLoadTasks.get(0).run();
- if (cacheLoadTaskService != null) {
- for (Future<?> future : futures) {
- future.get(); // if any exception occurs it will be thrown here
+ } catch (InterruptedException ignored) { // we use interrupt to close reload thread
+ } finally {
+ if (cacheLoadTaskService != null) {
+ cacheLoadTaskService.shutdownNow();
}
- cacheLoadTaskService.shutdownNow();
- }
- if (!isStopped) {
- // reassigning cache field is safe, because it's volatile
- cache = newCache;
}
+ cache = newCache; // reassigning cache field is safe, because it's volatile
}
@Override
public void close() throws Exception {
- super.close();
+ // firstly stop current reload in case when custom reloadTrigger didn't interrupt it
isStopped = true;
if (cacheLoadTasks != null) {
cacheLoadTasks.forEach(InputSplitCacheLoadTask::stopRunning);
}
+ super.close();
}
private InputSplitCacheLoadTask createCacheLoadTask(
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
index 8610b206fff..3d28a844446 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.types.RowKind;
@@ -39,7 +39,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
public class InputSplitCacheLoadTask implements Runnable {
private final ConcurrentHashMap<RowData, Collection<RowData>> cache;
- private final RowDataKeySelector keySelector;
+ private final GenericRowDataKeySelector keySelector;
private final RowDataSerializer cacheEntriesSerializer;
private final InputFormat<RowData, InputSplit> inputFormat;
private final InputSplit inputSplit;
@@ -48,7 +48,7 @@ public class InputSplitCacheLoadTask implements Runnable {
public InputSplitCacheLoadTask(
ConcurrentHashMap<RowData, Collection<RowData>> cache,
- RowDataKeySelector keySelector,
+ GenericRowDataKeySelector keySelector,
RowDataSerializer cacheEntriesSerializer,
InputFormat<RowData, InputSplit> inputFormat,
InputSplit inputSplit) {
@@ -57,6 +57,7 @@ public class InputSplitCacheLoadTask implements Runnable {
this.inputFormat = inputFormat;
this.cacheEntriesSerializer = cacheEntriesSerializer;
this.inputSplit = inputSplit;
+ keySelector.open();
}
@Override
@@ -67,7 +68,7 @@ public class InputSplitCacheLoadTask implements Runnable {
}
inputFormat.open(inputSplit);
RowData nextElement = new BinaryRowData(cacheEntriesSerializer.getArity());
- while (isRunning && !inputFormat.reachedEnd()) {
+ while (isRunning && !inputFormat.reachedEnd() && !Thread.interrupted()) {
nextElement = inputFormat.nextRecord(nextElement);
if (nextElement != null) {
if (nextElement.getRowKind() != RowKind.INSERT) {
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
index 13f01fd7d76..50dfa071c17 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/keyselector/GenericRowDataKeySelector.java
@@ -44,12 +44,16 @@ public class GenericRowDataKeySelector implements RowDataKeySelector {
this.keySerializer = keySerializer;
}
+ public void open() {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ //noinspection unchecked
+ projection = generatedProjection.newInstance(cl);
+ }
+
@Override
public RowData getKey(RowData value) throws Exception {
if (projection == null) {
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
- //noinspection unchecked
- projection = generatedProjection.newInstance(cl);
+ open();
}
return keySerializer.copy(projection.apply(value));
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/LookupFullCacheTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/LookupFullCacheTest.java
index 4830383c2d1..8fa1a2d747a 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/LookupFullCacheTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/LookupFullCacheTest.java
@@ -62,7 +62,7 @@ public class LookupFullCacheTest {
assertThat(result).isEqualTo(newResult);
}
assertThat(reloadTrigger.isClosed()).isTrue();
- assertThat(cacheLoader.isClosed()).isTrue();
+ assertThat(cacheLoader.isStopped()).isTrue();
}
@Test
@@ -79,7 +79,7 @@ public class LookupFullCacheTest {
assertThat(result).isEqualTo(newResult);
}
assertThat(reloadTrigger.isClosed()).isTrue();
- assertThat(cacheLoader.isClosed()).isTrue();
+ assertThat(cacheLoader.isStopped()).isTrue();
}
@Test
@@ -96,7 +96,7 @@ public class LookupFullCacheTest {
assertThat(result.size()).isEqualTo(0);
}
assertThat(reloadTrigger.isClosed()).isTrue();
- assertThat(cacheLoader.isClosed()).isTrue();
+ assertThat(cacheLoader.isStopped()).isTrue();
}
@Test
@@ -109,10 +109,14 @@ public class LookupFullCacheTest {
});
try (LookupFullCache fullCache = createAndLoadCache(cacheLoader)) {
reloadTrigger.trigger();
+ assertThat(cacheLoader.isStopped()).isTrue();
+ assertThat(cacheLoader.getNumLoads()).isEqualTo(2);
assertThatThrownBy(() -> fullCache.getIfPresent(row(1))).hasRootCause(exception);
+ reloadTrigger.trigger();
+ assertThat(cacheLoader.getNumLoads()).isEqualTo(2); // no reload after fail
}
assertThat(reloadTrigger.isClosed()).isTrue();
- assertThat(cacheLoader.isClosed()).isTrue();
+ assertThat(cacheLoader.isStopped()).isTrue();
}
@Test
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestCacheLoader.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestCacheLoader.java
index 859c13e59c3..53077937467 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestCacheLoader.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/TestCacheLoader.java
@@ -44,7 +44,6 @@ public class TestCacheLoader extends CacheLoader {
private final Consumer<Map<RowData, Collection<RowData>>> secondLoadDataChange;
private int numLoads;
- private boolean isClosed;
private boolean isAwaitTriggered;
public TestCacheLoader(Consumer<Map<RowData, Collection<RowData>>> secondLoadDataChange) {
@@ -55,8 +54,8 @@ public class TestCacheLoader extends CacheLoader {
return numLoads;
}
- public boolean isClosed() {
- return isClosed;
+ public boolean isStopped() {
+ return isStopped;
}
public boolean isAwaitTriggered() {
@@ -76,10 +75,4 @@ public class TestCacheLoader extends CacheLoader {
secondLoadDataChange.accept(cache);
}
}
-
- @Override
- public void close() throws Exception {
- super.close();
- isClosed = true;
- }
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java
index 821f3b92cc1..fef9eac4123 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/FullCacheTestInputFormat.java
@@ -45,8 +45,8 @@ import static org.assertj.core.api.Assertions.assertThat;
public class FullCacheTestInputFormat
extends RichInputFormat<RowData, FullCacheTestInputFormat.QueueInputSplit> {
+ public static final AtomicInteger OPEN_CLOSED_COUNTER = new AtomicInteger(0);
private static final int DEFAULT_NUM_SPLITS = 2;
- private static final AtomicInteger OPEN_CLOSED_COUNTER = new AtomicInteger(0);
// RowData is not serializable, so we store Rows
private final Collection<Row> dataRows;
@@ -164,10 +164,6 @@ public class FullCacheTestInputFormat
OPEN_CLOSED_COUNTER.decrementAndGet();
}
- public boolean isClosed() {
- return OPEN_CLOSED_COUNTER.get() == 0;
- }
-
/** {@link InputSplit} that provides queue to {@link InputFormat}. */
public static class QueueInputSplit implements InputSplit {
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java
index a9271d3ddca..dcb73ae0346 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java
@@ -35,7 +35,10 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -45,21 +48,36 @@ import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup.UNINITIALIZED;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Unit test for {@link InputFormatCacheLoader}. */
class InputFormatCacheLoaderTest {
+ @BeforeEach
+ void resetCounter() {
+ FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.set(0);
+ }
+
+ @AfterEach
+ void checkCounter() {
+ assertThat(FullCacheTestInputFormat.OPEN_CLOSED_COUNTER).hasValue(0);
+ }
+
@ParameterizedTest
@MethodSource("deltaNumSplits")
void testReadWithDifferentSplits(int deltaNumSplits) throws Exception {
- InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits, null);
+ InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits);
cacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup());
cacheLoader.run();
ConcurrentHashMap<RowData, Collection<RowData>> cache = cacheLoader.getCache();
@@ -72,7 +90,7 @@ class InputFormatCacheLoaderTest {
@Test
void testCacheMetrics() throws Exception {
- InputFormatCacheLoader cacheLoader = createCacheLoader(0, null);
+ InputFormatCacheLoader cacheLoader = createCacheLoader(0);
InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
cacheLoader.open(metricGroup);
// These metrics are registered
@@ -99,15 +117,54 @@ class InputFormatCacheLoaderTest {
}
@Test
- void testExceptionHandling() throws Exception {
+ void testExceptionDuringReload() throws Exception {
RuntimeException exception = new RuntimeException("Load failed.");
- InputFormatCacheLoader cacheLoader = createCacheLoader(0, exception);
+ Runnable reloadAction =
+ () -> {
+ throw exception;
+ };
+ InputFormatCacheLoader cacheLoader = createCacheLoader(0, reloadAction);
InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
cacheLoader.open(metricGroup);
assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
}
+ @Test
+ void testCloseAndInterruptDuringReload() throws Exception {
+ AtomicInteger sleepCounter = new AtomicInteger(0);
+ int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to number of all rows
+ Runnable reloadAction =
+ ThrowingRunnable.unchecked(
+ () -> {
+ sleepCounter.incrementAndGet();
+ Thread.sleep(1000);
+ });
+ InputFormatCacheLoader cacheLoader = createCacheLoader(0, reloadAction);
+ InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
+ cacheLoader.open(metricGroup);
+
+ // check interruption
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future<?> future = executorService.submit(cacheLoader);
+ executorService.shutdownNow(); // internally interrupts a thread
+ assertThatNoException().isThrownBy(future::get); // wait for the end
+ // check that we didn't process all elements, but reacted on interruption
+ assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+ assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+
+ sleepCounter.set(0);
+
+ // check closing
+ executorService = Executors.newSingleThreadExecutor();
+ future = executorService.submit(cacheLoader);
+ cacheLoader.close();
+ assertThatNoException().isThrownBy(future::get); // wait for the end
+ // check that we didn't process all elements, but reacted on closing
+ assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+ assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+ }
+
static Stream<Arguments> deltaNumSplits() {
return Stream.of(Arguments.of(-1), Arguments.of(0), Arguments.of(1));
}
@@ -121,8 +178,12 @@ class InputFormatCacheLoaderTest {
assertThat(rows).containsExactlyInAnyOrderElementsOf(actual.get(key)));
}
- private InputFormatCacheLoader createCacheLoader(
- int deltaNumSplits, RuntimeException testException) throws Exception {
+ private InputFormatCacheLoader createCacheLoader(int deltaNumSplits) throws Exception {
+ return createCacheLoader(deltaNumSplits, () -> {});
+ }
+
+ private InputFormatCacheLoader createCacheLoader(int deltaNumSplits, Runnable reloadAction)
+ throws Exception {
DataType rightRowDataType =
DataTypes.ROW(
DataTypes.FIELD("f0", DataTypes.INT()),
@@ -150,10 +211,10 @@ class InputFormatCacheLoaderTest {
new GeneratedProjection("", "", new Object[0]) {
@Override
public Projection newInstance(ClassLoader classLoader) {
- if (testException != null) {
- throw testException;
- }
- return row -> row(row.getInt(0));
+ return row -> {
+ reloadAction.run();
+ return row(row.getInt(0));
+ };
}
};
GenericRowDataKeySelector keySelector =