You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/04 02:13:33 UTC
[flink] branch master updated: [FLINK-28238][sql-gateway] Fix unstable testCancelOperationAndFetchResultInParallel (#20063)
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7974e81ec51 [FLINK-28238][sql-gateway] Fix unstable testCancelOperationAndFetchResultInParallel (#20063)
7974e81ec51 is described below
commit 7974e81ec51a2071b9658f768f651ffc371b15b0
Author: Shengkai <33...@users.noreply.github.com>
AuthorDate: Mon Jul 4 10:13:25 2022 +0800
[FLINK-28238][sql-gateway] Fix unstable testCancelOperationAndFetchResultInParallel (#20063)
---
.../gateway/service/SqlGatewayServiceITCase.java | 62 +++++++++++++++++-----
.../gateway/service/result/ResultFetcherTest.java | 9 +++-
.../service/utils/IgnoreExceptionHandler.java | 36 +++++++++++++
3 files changed, 92 insertions(+), 15 deletions(-)
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 52d74fb882e..d39c797eb63 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -37,12 +37,15 @@ import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.operation.Operation;
import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Condition;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -56,8 +59,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses;
import static org.apache.flink.types.RowKind.DELETE;
import static org.apache.flink.types.RowKind.INSERT;
import static org.apache.flink.types.RowKind.UPDATE_AFTER;
@@ -81,6 +86,9 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
SessionEnvironment.newBuilder()
.setSessionEndpointVersion(MockedEndpointVersion.V1)
.build();
+ private final ThreadFactory threadFactory =
+ new ExecutorThreadFactory(
+ "SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE);
@BeforeAll
public static void setUp() {
@@ -244,7 +252,7 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
// --------------------------------------------------------------------------------------------
@Test
- public void testCancelOperationAndFetchResultInParallel() throws Exception {
+ public void testCancelOperationAndFetchResultInParallel() {
SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
CountDownLatch latch = new CountDownLatch(1);
// Make sure cancel the Operation before finish.
@@ -253,9 +261,13 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
sessionHandle,
operationHandle,
() -> service.cancelOperation(sessionHandle, operationHandle),
- String.format(
- "Can not fetch results from the %s in %s status.",
- operationHandle, OperationStatus.CANCELED));
+ new Condition<>(
+ msg ->
+ msg.contains(
+ String.format(
+ "Can not fetch results from the %s in %s status.",
+ operationHandle, OperationStatus.CANCELED)),
+ "Fetch results with expected error message."));
latch.countDown();
}
@@ -273,9 +285,19 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
sessionHandle,
operationHandle,
() -> service.closeOperation(sessionHandle, operationHandle),
- String.format(
- "Can not find the submitted operation in the OperationManager with the %s.",
- operationHandle));
+ // It's possible the fetcher fetch the result from a closed operation or fetcher
+ // can't find the operation.
+ new Condition<>(
+ msg ->
+ msg.contains(
+ String.format(
+ "Can not find the submitted operation in the OperationManager with the %s.",
+ operationHandle))
+ || msg.contains(
+ String.format(
+ "Can not fetch results from the %s in %s status.",
+ operationHandle, OperationStatus.CLOSED)),
+ "Fetch results with expected error message."));
}
@Test
@@ -300,8 +322,12 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
service.getSession(sessionHandle)
.getOperationManager()
.getOperation(operationHandle));
- new Thread(() -> service.cancelOperation(sessionHandle, operationHandle)).start();
- new Thread(() -> service.closeOperation(sessionHandle, operationHandle)).start();
+ threadFactory
+ .newThread(() -> service.cancelOperation(sessionHandle, operationHandle))
+ .start();
+ threadFactory
+ .newThread(() -> service.closeOperation(sessionHandle, operationHandle))
+ .start();
}
CommonTestUtils.waitUtil(
@@ -323,7 +349,8 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
int submitThreadsNum = 100;
CountDownLatch latch = new CountDownLatch(submitThreadsNum);
for (int i = 0; i < submitThreadsNum; i++) {
- new Thread(
+ threadFactory
+ .newThread(
() -> {
try {
submitDefaultOperation(sessionHandle, () -> {});
@@ -419,10 +446,11 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
SessionHandle sessionHandle,
OperationHandle operationHandle,
RunnableWithException cancelOrClose,
- String errorMsg) {
+ Condition<String> condition) {
List<RowData> actual = new ArrayList<>();
- new Thread(
+ threadFactory
+ .newThread(
() -> {
try {
cancelOrClose.run();
@@ -442,13 +470,19 @@ public class SqlGatewayServiceITCase extends AbstractTestBase {
operationHandle,
token,
Integer.MAX_VALUE);
- token = resultSet.getNextToken();
+ // Keep fetching from the Operation until meet exceptions.
+ if (resultSet.getNextToken() != null) {
+ token = resultSet.getNextToken();
+ }
if (resultSet.getResultType() == ResultSet.ResultType.PAYLOAD) {
actual.addAll(resultSet.getData());
}
}
})
- .satisfies(anyCauseMatches(errorMsg));
+ .satisfies(
+ t ->
+ assertThatChainOfCauses(t)
+ .anySatisfy(t1 -> condition.matches(t1.getMessage())));
assertTrue(new HashSet<>(getDefaultResultSet().getData()).containsAll(actual));
}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
index 2acaa03cd81..dd079ced141 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
@@ -28,9 +28,11 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.commons.collections.iterators.IteratorChain;
import org.junit.jupiter.api.BeforeAll;
@@ -45,6 +47,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -58,6 +61,9 @@ public class ResultFetcherTest extends TestLogger {
private static ResolvedSchema schema;
private static List<RowData> data;
+ private final ThreadFactory threadFactory =
+ new ExecutorThreadFactory("Result Fetcher Test Pool", IgnoreExceptionHandler.INSTANCE);
+
@BeforeAll
public static void setUp() {
schema =
@@ -180,7 +186,8 @@ public class ResultFetcherTest extends TestLogger {
"Failed to wait the buffer has data.");
List<RowData> firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE).getData();
for (int i = 0; i < fetchThreadNum; i++) {
- new Thread(
+ threadFactory
+ .newThread(
() -> {
ResultSet resultSet = fetcher.fetchResults(0, Integer.MAX_VALUE);
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/IgnoreExceptionHandler.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/IgnoreExceptionHandler.java
new file mode 100644
index 00000000000..aebfca881a8
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/IgnoreExceptionHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Handler to log the exception and exits. */
+public class IgnoreExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+ public static final IgnoreExceptionHandler INSTANCE = new IgnoreExceptionHandler();
+
+ private static final Logger LOG = LoggerFactory.getLogger(IgnoreExceptionHandler.class);
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ // ignore error
+ LOG.error("Thread '{}' produced an uncaught exception. Ignore...", t.getName(), e);
+ }
+}