You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/19 07:06:07 UTC

[flink] branch master updated: [FLINK-30677][sql-gateway] Fix unstable SqlGatewayServiceStatementITCase#testFlinkSqlStatements

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fdb5c40094 [FLINK-30677][sql-gateway] Fix unstable SqlGatewayServiceStatementITCase#testFlinkSqlStatements
4fdb5c40094 is described below

commit 4fdb5c40094cfaa5fb3b6d7ce9ec891dab3ef32a
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Mon Jan 16 21:13:34 2023 +0800

    [FLINK-30677][sql-gateway] Fix unstable SqlGatewayServiceStatementITCase#testFlinkSqlStatements
    
    This closes #21700
---
 .../service/operation/OperationExecutor.java       |  3 +-
 .../gateway/service/result/ResultFetcher.java      | 42 +++++++++++++---------
 .../gateway/AbstractSqlGatewayStatementITCase.java |  9 ++---
 .../SqlGatewayRestEndpointStatementITCase.java     |  4 +++
 4 files changed, 37 insertions(+), 21 deletions(-)

diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index cfcb70a059c..3c377eba893 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -409,7 +409,8 @@ public class OperationExecutor {
                 ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
                 Collections.singletonList(
                         GenericRowData.of(StringData.fromString(jobID.toString()))),
-                jobID);
+                jobID,
+                result.getResultKind());
     }
 
     private ResultFetcher callOperation(
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
index b48659e11cd..9fa810c9c8c 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.result;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.ResultKind;
 import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -31,6 +32,7 @@ import org.apache.flink.table.gateway.api.results.ResultSetImpl;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.utils.print.RowDataToStringConverter;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -119,7 +121,8 @@ public class ResultFetcher {
             OperationHandle operationHandle,
             ResolvedSchema resultSchema,
             List<RowData> rows,
-            @Nullable JobID jobID) {
+            @Nullable JobID jobID,
+            ResultKind resultKind) {
         this.operationHandle = operationHandle;
         this.resultSchema = resultSchema;
         this.bufferedResults.addAll(rows);
@@ -127,16 +130,15 @@ public class ResultFetcher {
         this.converter = SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
         this.isQueryResult = false;
         this.jobID = jobID;
-        this.resultKind = ResultKind.SUCCESS_WITH_CONTENT;
+        this.resultKind = resultKind;
     }
 
     public static ResultFetcher fromTableResult(
             OperationHandle operationHandle,
             TableResultInternal tableResult,
             boolean isQueryResult) {
-        JobID jobID = null;
         if (isQueryResult) {
-            jobID =
+            JobID jobID =
                     tableResult
                             .getJobClient()
                             .orElseThrow(
@@ -146,29 +148,37 @@ public class ResultFetcher {
                                                             "Can't get job client for the operation %s.",
                                                             operationHandle)))
                             .getJobID();
+            return new ResultFetcher(
+                    operationHandle,
+                    tableResult.getResolvedSchema(),
+                    tableResult.collectInternal(),
+                    tableResult.getRowDataToStringConverter(),
+                    true,
+                    jobID,
+                    tableResult.getResultKind());
+        } else {
+            return new ResultFetcher(
+                    operationHandle,
+                    tableResult.getResolvedSchema(),
+                    CollectionUtil.iteratorToList(tableResult.collectInternal()),
+                    tableResult.getJobClient().map(JobClient::getJobID).orElse(null),
+                    tableResult.getResultKind());
         }
-
-        return new ResultFetcher(
-                operationHandle,
-                tableResult.getResolvedSchema(),
-                tableResult.collectInternal(),
-                tableResult.getRowDataToStringConverter(),
-                isQueryResult,
-                jobID,
-                tableResult.getResultKind());
     }
 
     public static ResultFetcher fromResults(
             OperationHandle operationHandle, ResolvedSchema resultSchema, List<RowData> results) {
-        return fromResults(operationHandle, resultSchema, results, null);
+        return fromResults(
+                operationHandle, resultSchema, results, null, ResultKind.SUCCESS_WITH_CONTENT);
     }
 
     public static ResultFetcher fromResults(
             OperationHandle operationHandle,
             ResolvedSchema resultSchema,
             List<RowData> results,
-            @Nullable JobID jobID) {
-        return new ResultFetcher(operationHandle, resultSchema, results, jobID);
+            @Nullable JobID jobID,
+            ResultKind resultKind) {
+        return new ResultFetcher(operationHandle, resultSchema, results, jobID, resultKind);
     }
 
     public void close() {
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
index f14ea52468c..890e55acc02 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
@@ -320,11 +320,12 @@ public abstract class AbstractSqlGatewayStatementITCase extends AbstractTestBase
             Iterator<RowData> iterator)
             throws Exception {
         if (type.equals(StatementType.EXPLAIN) || type.equals(StatementType.SHOW_CREATE)) {
+            StringBuilder sb = new StringBuilder();
+            while (iterator.hasNext()) {
+                sb.append(iterator.next().getString(0).toString());
+            }
             return Tag.OK.addTag(
-                    replaceStreamNodeId(
-                                    replaceNodeIdInOperator(
-                                            iterator.next().getString(0).toString()))
-                            + "\n");
+                    replaceStreamNodeId(replaceNodeIdInOperator(sb.toString())) + "\n");
         } else if (schema.getColumn(0)
                 .map(col -> col.getName().equals(Constants.JOB_ID))
                 .orElse(false)) {
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
index f87558f171a..24fed69b90e 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
@@ -47,6 +47,7 @@ import org.apache.flink.table.gateway.rest.util.TestingRestClient;
 import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
 import org.apache.flink.table.utils.DateTimeUtils;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.StringUtils;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -216,6 +217,9 @@ public class SqlGatewayRestEndpointStatementITCase extends AbstractSqlGatewaySta
 
     @Override
     protected String stringifyException(Throwable t) {
+        if (StringUtils.isNullOrWhitespaceOnly(t.getMessage())) {
+            return t.getClass().getCanonicalName();
+        }
         String message = t.getMessage();
         String[] splitExceptions = message.split(PATTERN1);
         return splitExceptions[splitExceptions.length - 1].split(PATTERN2)[0];