You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/02 13:30:02 UTC

[GitHub] [flink] XComp commented on a diff in pull request #20919: [FLINK-29405] Fix unstable test InputFormatCacheLoaderTest

XComp commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1038080578


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -107,7 +109,11 @@ protected void reloadCache() throws Exception {
         } catch (InterruptedException ignored) { // we use interrupt to close reload thread
         } finally {
             if (cacheLoadTaskService != null) {
+                // if main cache reload thread encountered an exception,
+                // it interrupts underlying InputSplitCacheLoadTasks threads
                 cacheLoadTaskService.shutdownNow();

Review Comment:
   I just noticed that there's also `ExecutorUtils.gracefulShutdown`. Maybe, that would be worth using it as it would also include proper logging in case of an error.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -149,19 +155,33 @@ void testCloseAndInterruptDuringReload() throws Exception {
         Future<?> future = executorService.submit(cacheLoader);
         executorService.shutdownNow(); // internally interrupts a thread
         assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
         assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+    }
 
-        sleepCounter.set(0);
+    @Test
+    void testCloseDuringReload() throws Exception {
+        AtomicInteger recordsCounter = new AtomicInteger(0);
+        int totalRecords = TestCacheLoader.DATA.values().stream().mapToInt(Collection::size).sum();
+        CountDownLatch latch = new CountDownLatch(1);
+        Runnable reloadAction =
+                ThrowingRunnable.unchecked(
+                        () -> {
+                            recordsCounter.incrementAndGet();
+                            latch.await();

Review Comment:
   Doing that leads to a deadlock. Even just adding `Thread.sleep` shortly before closing the cache loader makes the test run into a deadlock because [CacheLoader:101](https://github.com/apache/flink/blob/340b100f2de5e0d90ba475aa8a00e359a61442ce/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/CacheLoader.java#L101) acquires the `CacheLoader.reloadLock` and initiates the reloading of the cache synchronously while having the lock acquired. The data reload blocks due to the `CountDownLatch` we use in the test. The subsequent call of close triggers the shutdown of the `InputSplitCacheLoadTask` instances through `InputSplitCacheLoadTask#stopRunning`. But the shutdown of those tasks will never be done because the data loading is still blocked and the tasks are stuck in the [while loop](https://github.com/apache/flink/blob/340b100f2de5e0d90ba475aa8a00e359a61442ce/flink-table/flink-table-runtime/src/main/java/org/apache/flink/tabl
 e/runtime/functions/table/lookup/fullcache/inputformat/InputSplitCacheLoadTask.java#L71). [InputFormatCacheLoader#close](https://github.com/apache/flink/blob/a024a366f73f822bb4fd35db737ac2b8177f6b25/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java#L129) will continue with calling `CacheLoader#close` which tries to acquire the lock that's already acquired by the thread that executes `InputFormatCacheLoader` and we end up in a deadlock.
   
   My suspicion is that the test as it is pushed right now is passing because we're calling close before the data reload is initiated. AFAIU, that's not what we want to test, is it? Please correct me if I'm wrong here.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -131,15 +133,19 @@ void testExceptionDuringReload() throws Exception {
     }
 
     @Test
-    void testCloseAndInterruptDuringReload() throws Exception {
-        AtomicInteger sleepCounter = new AtomicInteger(0);
-        int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to number of all rows
+    void testInterruptDuringReload() throws Exception {
+        CountDownLatch recordsProcessingLatch = new CountDownLatch(1);
         Runnable reloadAction =
-                ThrowingRunnable.unchecked(
-                        () -> {
-                            sleepCounter.incrementAndGet();
-                            Thread.sleep(1000);
-                        });
+                () -> {
+                    try {
+                        // wait should be interrupted if everything works ok
+                        if (!recordsProcessingLatch.await(5, TimeUnit.SECONDS)) {
+                            throw new RuntimeException("timeout");
+                        }

Review Comment:
   ```suggestion
                       assertThatThrownBy(recordsProcessingLatch::await)
                               .as("wait should be interrupted if everything works ok")
                               .isInstanceOf(InterruptedException.class);
                       Thread.currentThread().interrupt(); // restore interrupted status
   ```
   I guess, we could get rid of the 5 seconds here. By waiting forever enables us to generate the thread dump at the end which gives more insights into what went wrong during the test execution.
   
   nit: I played around with the assertj API a bit more and utilized the comment as a assertion message. WDYT?



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -149,19 +155,33 @@ void testCloseAndInterruptDuringReload() throws Exception {
         Future<?> future = executorService.submit(cacheLoader);
         executorService.shutdownNow(); // internally interrupts a thread
         assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
         assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+    }
 
-        sleepCounter.set(0);
+    @Test
+    void testCloseDuringReload() throws Exception {
+        AtomicInteger recordsCounter = new AtomicInteger(0);
+        int totalRecords = TestCacheLoader.DATA.values().stream().mapToInt(Collection::size).sum();
+        CountDownLatch latch = new CountDownLatch(1);
+        Runnable reloadAction =
+                ThrowingRunnable.unchecked(
+                        () -> {
+                            recordsCounter.incrementAndGet();
+                            latch.await();

Review Comment:
   ```suggestion
                               reloadActionReachedTrigger.trigger();
                               latch.await();
   ```
   Don't we have to add something like a `OneShotLatch#trigger` here and we wait for it before calling `cacheLoader.close()` later in the test. That way, we make sure that we're actually at the stage where the reload tasks are instantiated before closing the loader.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.java:
##########
@@ -107,7 +109,11 @@ protected void reloadCache() throws Exception {
         } catch (InterruptedException ignored) { // we use interrupt to close reload thread
         } finally {
             if (cacheLoadTaskService != null) {
+                // if main cache reload thread encountered an exception,
+                // it interrupts underlying InputSplitCacheLoadTasks threads
                 cacheLoadTaskService.shutdownNow();

Review Comment:
   yeah, I was just curious about your opinion on the usage of common pool. It felt like implementing it asynchronously would have worked here. I just struggled to think of a better way to enable thread loading based on the number of `InputSplits` to optimize the resource utilization per cache loading cycle.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -149,19 +155,33 @@ void testCloseAndInterruptDuringReload() throws Exception {
         Future<?> future = executorService.submit(cacheLoader);
         executorService.shutdownNow(); // internally interrupts a thread
         assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
         assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0);
+    }
 
-        sleepCounter.set(0);
+    @Test
+    void testCloseDuringReload() throws Exception {
+        AtomicInteger recordsCounter = new AtomicInteger(0);
+        int totalRecords = TestCacheLoader.DATA.size() + 1; // 1 key with 2 records
+        CountDownLatch latch = new CountDownLatch(1);
+        Runnable reloadAction =
+                ThrowingRunnable.unchecked(
+                        () -> {
+                            recordsCounter.incrementAndGet();
+                            latch.await();
+                        });
+        InputFormatCacheLoader cacheLoader = createCacheLoader(0, reloadAction);
+        InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
+        cacheLoader.open(metricGroup);
 
         // check closing
-        executorService = Executors.newSingleThreadExecutor();
-        future = executorService.submit(cacheLoader);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        Future<?> future = executorService.submit(cacheLoader);
         cacheLoader.close();
-        assertThatNoException().isThrownBy(future::get); // wait for the end
+        latch.countDown();
+        future.get(); // wait for the end
+        executorService.shutdown();

Review Comment:
   Probably, you're right. I checked how we dealt with threads in other cases. But since the future itself completed already, it should be fine. My initial concern was that the thread itself doesn't finalize properly before the test is passed and that it could mingle around while other tests are executed. But looks like that's not an issue.
   
   What we could do, though, is to add a try/finally block to make sure that shutdown is also triggered if something else causes an exception to be thrown to handle proper shutdown of the thread in that case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org