You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/04 02:13:33 UTC

[flink] branch master updated: [FLINK-28238][sql-gateway] Fix unstable testCancelOperationAndFetchResultInParallel (#20063)

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7974e81ec51 [FLINK-28238][sql-gateway] Fix unstable testCancelOperationAndFetchResultInParallel (#20063)
7974e81ec51 is described below

commit 7974e81ec51a2071b9658f768f651ffc371b15b0
Author: Shengkai <33...@users.noreply.github.com>
AuthorDate: Mon Jul 4 10:13:25 2022 +0800

    [FLINK-28238][sql-gateway] Fix unstable testCancelOperationAndFetchResultInParallel (#20063)
---
 .../gateway/service/SqlGatewayServiceITCase.java   | 62 +++++++++++++++++-----
 .../gateway/service/result/ResultFetcherTest.java  |  9 +++-
 .../service/utils/IgnoreExceptionHandler.java      | 36 +++++++++++++
 3 files changed, 92 insertions(+), 15 deletions(-)

diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 52d74fb882e..d39c797eb63 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -37,12 +37,15 @@ import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.operation.Operation;
 import org.apache.flink.table.gateway.service.operation.OperationManager;
 import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.function.RunnableWithException;
 
 import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Condition;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -56,8 +59,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses;
 import static org.apache.flink.types.RowKind.DELETE;
 import static org.apache.flink.types.RowKind.INSERT;
 import static org.apache.flink.types.RowKind.UPDATE_AFTER;
@@ -81,6 +86,9 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             SessionEnvironment.newBuilder()
                     .setSessionEndpointVersion(MockedEndpointVersion.V1)
                     .build();
+    private final ThreadFactory threadFactory =
+            new ExecutorThreadFactory(
+                    "SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE);
 
     @BeforeAll
     public static void setUp() {
@@ -244,7 +252,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
     // --------------------------------------------------------------------------------------------
 
     @Test
-    public void testCancelOperationAndFetchResultInParallel() throws Exception {
+    public void testCancelOperationAndFetchResultInParallel() {
         SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
         CountDownLatch latch = new CountDownLatch(1);
         // Make sure cancel the Operation before finish.
@@ -253,9 +261,13 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 sessionHandle,
                 operationHandle,
                 () -> service.cancelOperation(sessionHandle, operationHandle),
-                String.format(
-                        "Can not fetch results from the %s in %s status.",
-                        operationHandle, OperationStatus.CANCELED));
+                new Condition<>(
+                        msg ->
+                                msg.contains(
+                                        String.format(
+                                                "Can not fetch results from the %s in %s status.",
+                                                operationHandle, OperationStatus.CANCELED)),
+                        "Fetch results with expected error message."));
         latch.countDown();
     }
 
@@ -273,9 +285,19 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                 sessionHandle,
                 operationHandle,
                 () -> service.closeOperation(sessionHandle, operationHandle),
-                String.format(
-                        "Can not find the submitted operation in the OperationManager with the %s.",
-                        operationHandle));
+                // It's possible the fetcher fetch the result from a closed operation or fetcher
+                // can't find the operation.
+                new Condition<>(
+                        msg ->
+                                msg.contains(
+                                                String.format(
+                                                        "Can not find the submitted operation in the OperationManager with the %s.",
+                                                        operationHandle))
+                                        || msg.contains(
+                                                String.format(
+                                                        "Can not fetch results from the %s in %s status.",
+                                                        operationHandle, OperationStatus.CLOSED)),
+                        "Fetch results with expected error message."));
     }
 
     @Test
@@ -300,8 +322,12 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                     service.getSession(sessionHandle)
                             .getOperationManager()
                             .getOperation(operationHandle));
-            new Thread(() -> service.cancelOperation(sessionHandle, operationHandle)).start();
-            new Thread(() -> service.closeOperation(sessionHandle, operationHandle)).start();
+            threadFactory
+                    .newThread(() -> service.cancelOperation(sessionHandle, operationHandle))
+                    .start();
+            threadFactory
+                    .newThread(() -> service.closeOperation(sessionHandle, operationHandle))
+                    .start();
         }
 
         CommonTestUtils.waitUtil(
@@ -323,7 +349,8 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
         int submitThreadsNum = 100;
         CountDownLatch latch = new CountDownLatch(submitThreadsNum);
         for (int i = 0; i < submitThreadsNum; i++) {
-            new Thread(
+            threadFactory
+                    .newThread(
                             () -> {
                                 try {
                                     submitDefaultOperation(sessionHandle, () -> {});
@@ -419,10 +446,11 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
             SessionHandle sessionHandle,
             OperationHandle operationHandle,
             RunnableWithException cancelOrClose,
-            String errorMsg) {
+            Condition<String> condition) {
 
         List<RowData> actual = new ArrayList<>();
-        new Thread(
+        threadFactory
+                .newThread(
                         () -> {
                             try {
                                 cancelOrClose.run();
@@ -442,13 +470,19 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
                                                 operationHandle,
                                                 token,
                                                 Integer.MAX_VALUE);
-                                token = resultSet.getNextToken();
+                                // Keep fetching from the Operation until meet exceptions.
+                                if (resultSet.getNextToken() != null) {
+                                    token = resultSet.getNextToken();
+                                }
                                 if (resultSet.getResultType() == ResultSet.ResultType.PAYLOAD) {
                                     actual.addAll(resultSet.getData());
                                 }
                             }
                         })
-                .satisfies(anyCauseMatches(errorMsg));
+                .satisfies(
+                        t ->
+                                assertThatChainOfCauses(t)
+                                        .anySatisfy(t1 -> condition.matches(t1.getMessage())));
 
         assertTrue(new HashSet<>(getDefaultResultSet().getData()).containsAll(actual));
     }
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
index 2acaa03cd81..dd079ced141 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
@@ -28,9 +28,11 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.apache.commons.collections.iterators.IteratorChain;
 import org.junit.jupiter.api.BeforeAll;
@@ -45,6 +47,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -58,6 +61,9 @@ public class ResultFetcherTest extends TestLogger {
     private static ResolvedSchema schema;
     private static List<RowData> data;
 
+    private final ThreadFactory threadFactory =
+            new ExecutorThreadFactory("Result Fetcher Test Pool", IgnoreExceptionHandler.INSTANCE);
+
     @BeforeAll
     public static void setUp() {
         schema =
@@ -180,7 +186,8 @@ public class ResultFetcherTest extends TestLogger {
                 "Failed to wait the buffer has data.");
         List<RowData> firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE).getData();
         for (int i = 0; i < fetchThreadNum; i++) {
-            new Thread(
+            threadFactory
+                    .newThread(
                             () -> {
                                 ResultSet resultSet = fetcher.fetchResults(0, Integer.MAX_VALUE);
 
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/IgnoreExceptionHandler.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/IgnoreExceptionHandler.java
new file mode 100644
index 00000000000..aebfca881a8
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/IgnoreExceptionHandler.java
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Handler to log the exception and exits. */
+public class IgnoreExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+    public static final IgnoreExceptionHandler INSTANCE = new IgnoreExceptionHandler();
+
+    private static final Logger LOG = LoggerFactory.getLogger(IgnoreExceptionHandler.class);
+
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        // ignore error
+        LOG.error("Thread '{}' produced an uncaught exception. Ignore...", t.getName(), e);
+    }
+}