You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/19 07:06:07 UTC
[flink] branch master updated: [FLINK-30677][sql-gateway] Fix unstable SqlGatewayServiceStatementITCase#testFlinkSqlStatements
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4fdb5c40094 [FLINK-30677][sql-gateway] Fix unstable SqlGatewayServiceStatementITCase#testFlinkSqlStatements
4fdb5c40094 is described below
commit 4fdb5c40094cfaa5fb3b6d7ce9ec891dab3ef32a
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Mon Jan 16 21:13:34 2023 +0800
[FLINK-30677][sql-gateway] Fix unstable SqlGatewayServiceStatementITCase#testFlinkSqlStatements
This closes #21700
---
.../service/operation/OperationExecutor.java | 3 +-
.../gateway/service/result/ResultFetcher.java | 42 +++++++++++++---------
.../gateway/AbstractSqlGatewayStatementITCase.java | 9 ++---
.../SqlGatewayRestEndpointStatementITCase.java | 4 +++
4 files changed, 37 insertions(+), 21 deletions(-)
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index cfcb70a059c..3c377eba893 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -409,7 +409,8 @@ public class OperationExecutor {
ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
Collections.singletonList(
GenericRowData.of(StringData.fromString(jobID.toString()))),
- jobID);
+ jobID,
+ result.getResultKind());
}
private ResultFetcher callOperation(
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
index b48659e11cd..9fa810c9c8c 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.result;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -31,6 +32,7 @@ import org.apache.flink.table.gateway.api.results.ResultSetImpl;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,7 +121,8 @@ public class ResultFetcher {
OperationHandle operationHandle,
ResolvedSchema resultSchema,
List<RowData> rows,
- @Nullable JobID jobID) {
+ @Nullable JobID jobID,
+ ResultKind resultKind) {
this.operationHandle = operationHandle;
this.resultSchema = resultSchema;
this.bufferedResults.addAll(rows);
@@ -127,16 +130,15 @@ public class ResultFetcher {
this.converter = SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
this.isQueryResult = false;
this.jobID = jobID;
- this.resultKind = ResultKind.SUCCESS_WITH_CONTENT;
+ this.resultKind = resultKind;
}
public static ResultFetcher fromTableResult(
OperationHandle operationHandle,
TableResultInternal tableResult,
boolean isQueryResult) {
- JobID jobID = null;
if (isQueryResult) {
- jobID =
+ JobID jobID =
tableResult
.getJobClient()
.orElseThrow(
@@ -146,29 +148,37 @@ public class ResultFetcher {
"Can't get job client for the operation %s.",
operationHandle)))
.getJobID();
+ return new ResultFetcher(
+ operationHandle,
+ tableResult.getResolvedSchema(),
+ tableResult.collectInternal(),
+ tableResult.getRowDataToStringConverter(),
+ true,
+ jobID,
+ tableResult.getResultKind());
+ } else {
+ return new ResultFetcher(
+ operationHandle,
+ tableResult.getResolvedSchema(),
+ CollectionUtil.iteratorToList(tableResult.collectInternal()),
+ tableResult.getJobClient().map(JobClient::getJobID).orElse(null),
+ tableResult.getResultKind());
}
-
- return new ResultFetcher(
- operationHandle,
- tableResult.getResolvedSchema(),
- tableResult.collectInternal(),
- tableResult.getRowDataToStringConverter(),
- isQueryResult,
- jobID,
- tableResult.getResultKind());
}
public static ResultFetcher fromResults(
OperationHandle operationHandle, ResolvedSchema resultSchema, List<RowData> results) {
- return fromResults(operationHandle, resultSchema, results, null);
+ return fromResults(
+ operationHandle, resultSchema, results, null, ResultKind.SUCCESS_WITH_CONTENT);
}
public static ResultFetcher fromResults(
OperationHandle operationHandle,
ResolvedSchema resultSchema,
List<RowData> results,
- @Nullable JobID jobID) {
- return new ResultFetcher(operationHandle, resultSchema, results, jobID);
+ @Nullable JobID jobID,
+ ResultKind resultKind) {
+ return new ResultFetcher(operationHandle, resultSchema, results, jobID, resultKind);
}
public void close() {
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
index f14ea52468c..890e55acc02 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
@@ -320,11 +320,12 @@ public abstract class AbstractSqlGatewayStatementITCase extends AbstractTestBase
Iterator<RowData> iterator)
throws Exception {
if (type.equals(StatementType.EXPLAIN) || type.equals(StatementType.SHOW_CREATE)) {
+ StringBuilder sb = new StringBuilder();
+ while (iterator.hasNext()) {
+ sb.append(iterator.next().getString(0).toString());
+ }
return Tag.OK.addTag(
- replaceStreamNodeId(
- replaceNodeIdInOperator(
- iterator.next().getString(0).toString()))
- + "\n");
+ replaceStreamNodeId(replaceNodeIdInOperator(sb.toString())) + "\n");
} else if (schema.getColumn(0)
.map(col -> col.getName().equals(Constants.JOB_ID))
.orElse(false)) {
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
index f87558f171a..24fed69b90e 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
@@ -47,6 +47,7 @@ import org.apache.flink.table.gateway.rest.util.TestingRestClient;
import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.StringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -216,6 +217,9 @@ public class SqlGatewayRestEndpointStatementITCase extends AbstractSqlGatewaySta
@Override
protected String stringifyException(Throwable t) {
+ if (StringUtils.isNullOrWhitespaceOnly(t.getMessage())) {
+ return t.getClass().getCanonicalName();
+ }
String message = t.getMessage();
String[] splitExceptions = message.split(PATTERN1);
return splitExceptions[splitExceptions.length - 1].split(PATTERN2)[0];