You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/30 02:44:51 UTC

[flink] branch master updated: [FLINK-30816][sql-client] Fix SQL Client always uses the highest packaged version to connect to gateway

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cc6315a3e82 [FLINK-30816][sql-client] Fix SQL Client always uses the highest packaged version to connect to gateway
cc6315a3e82 is described below

commit cc6315a3e82c570e631684793d08ca9cb5403521
Author: Shengkai <10...@qq.com>
AuthorDate: Sun Jan 29 18:29:27 2023 +0800

    [FLINK-30816][sql-client] Fix SQL Client always uses the highest packaged version to connect to gateway
    
    This closes #21776
---
 .../flink/table/client/gateway/ExecutorImpl.java   | 85 +++++++++++++++++++---
 .../client/gateway/local/ExecutorImplITCase.java   | 68 +++++++++++++++++
 .../rest/handler/util/GetApiVersionHandler.java    | 25 +++++--
 .../rest/header/statement/FetchResultsHeaders.java |  4 +-
 .../rest/util/SqlGatewayRestAPIVersion.java        | 16 ++++
 5 files changed, 179 insertions(+), 19 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index 93e9550224e..f324f516406 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.table.api.SqlParserEOFException;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.data.RowData;
@@ -46,6 +47,7 @@ import org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbea
 import org.apache.flink.table.gateway.rest.header.statement.CompleteStatementHeaders;
 import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
 import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
+import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders;
 import org.apache.flink.table.gateway.rest.message.operation.OperationMessageParameters;
 import org.apache.flink.table.gateway.rest.message.operation.OperationStatusResponseBody;
 import org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
@@ -61,9 +63,11 @@ import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessage
 import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
 import org.apache.flink.table.gateway.rest.serde.ResultInfo;
 import org.apache.flink.table.gateway.rest.util.RowFormat;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointUtils;
 import org.apache.flink.table.gateway.service.context.DefaultContext;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +76,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
@@ -83,6 +88,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
 
@@ -97,9 +103,10 @@ public class ExecutorImpl implements Executor {
     private final long heartbeatInterval;
     private final ExecutorService service;
     private final ScheduledExecutorService heartbeatScheduler;
+    private final RestClient restClient;
 
-    private RestClient restClient;
     private SessionHandle sessionHandle;
+    private SqlGatewayRestAPIVersion connectionVersion;
 
     @VisibleForTesting
     public ExecutorImpl(
@@ -111,6 +118,11 @@ public class ExecutorImpl implements Executor {
         this.heartbeatInterval = heartbeatInterval;
         this.service = Executors.newCachedThreadPool();
         this.heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
+        try {
+            this.restClient = new RestClient(defaultContext.getFlinkConfig(), service);
+        } catch (Exception e) {
+            throw new SqlClientException("Can not create the Rest Client.", e);
+        }
     }
 
     public ExecutorImpl(DefaultContext defaultContext, InetSocketAddress gatewayAddress) {
@@ -119,21 +131,16 @@ public class ExecutorImpl implements Executor {
 
     public void openSession(@Nullable String sessionId) {
         try {
-            restClient = new RestClient(defaultContext.getFlinkConfig(), service);
-        } catch (Exception e) {
-            throw new SqlExecutionException("Can not create the Rest Client.", e);
-        }
-
-        LOG.info("Open session to {}.", gatewayAddress);
-        OpenSessionRequestBody request =
-                new OpenSessionRequestBody(sessionId, defaultContext.getFlinkConfig().toMap());
-        try {
+            // determine gateway rest api version
+            connectionVersion = negotiateVersion();
             // open session to address:port
+            LOG.info("Open session to {}.", gatewayAddress);
             OpenSessionResponseBody response =
                     sendRequest(
                                     OpenSessionHeaders.getInstance(),
                                     EmptyMessageParameters.getInstance(),
-                                    request)
+                                    new OpenSessionRequestBody(
+                                            sessionId, defaultContext.getFlinkConfig().toMap()))
                             .get();
             sessionHandle = new SessionHandle(UUID.fromString(response.getSessionHandle()));
             // register heartbeat service
@@ -339,13 +346,29 @@ public class ExecutorImpl implements Executor {
                     R extends RequestBody,
                     P extends ResponseBody>
             CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) {
+        Preconditions.checkNotNull(connectionVersion, "The connection version should not be null.");
+        return sendRequest(messageHeaders, messageParameters, request, connectionVersion);
+    }
+
+    private <
+                    M extends MessageHeaders<R, P, U>,
+                    U extends MessageParameters,
+                    R extends RequestBody,
+                    P extends ResponseBody>
+            CompletableFuture<P> sendRequest(
+                    M messageHeaders,
+                    U messageParameters,
+                    R request,
+                    SqlGatewayRestAPIVersion connectionVersion) {
         try {
             return restClient.sendRequest(
                     gatewayAddress.getHostName(),
                     gatewayAddress.getPort(),
                     messageHeaders,
                     messageParameters,
-                    request);
+                    request,
+                    Collections.emptyList(),
+                    connectionVersion);
         } catch (IOException ioException) {
             throw new SqlExecutionException("Failed to connect to the SQL Gateway.", ioException);
         }
@@ -429,4 +452,42 @@ public class ExecutorImpl implements Executor {
     private OperationHandle getOperationHandle(Supplier<String> handleSupplier) {
         return new OperationHandle(UUID.fromString(handleSupplier.get()));
     }
+
+    private SqlGatewayRestAPIVersion negotiateVersion() {
+        List<SqlGatewayRestAPIVersion> gatewayVersions =
+                getResponse(
+                                sendRequest(
+                                        GetApiVersionHeaders.getInstance(),
+                                        EmptyMessageParameters.getInstance(),
+                                        EmptyRequestBody.getInstance(),
+                                        // Currently, RestClient always uses the latest REST API
+                                        // version to build the targetUrl. However, it's possible
+                                        // that the client REST API version is higher than the
+                                        // server REST API version. In this case, the gateway will
+                                        // report Not Found Error to notify the client.
+                                        //
+                                        // So, here use the lowest REST API version to get the
+                                        // remote gateway version list and then determine the
+                                        // connection version.
+                                        // TODO: Remove this after the REST Client should allow
+                                        // to build the target URL without API version.
+                                        Collections.min(
+                                                SqlGatewayRestAPIVersion.getStableVersions())))
+                        .getVersions().stream()
+                        .map(SqlGatewayRestAPIVersion::valueOf)
+                        .collect(Collectors.toList());
+        SqlGatewayRestAPIVersion clientVersion = SqlGatewayRestAPIVersion.getDefaultVersion();
+
+        if (gatewayVersions.contains(clientVersion)) {
+            return clientVersion;
+        } else {
+            SqlGatewayRestAPIVersion latestVersion =
+                    RestAPIVersion.getLatestVersion(gatewayVersions);
+            if (latestVersion.equals(SqlGatewayRestAPIVersion.V1)) {
+                throw new SqlExecutionException(
+                        "Currently, SQL Client only supports to connect to the REST endpoint with API version larger than V1.");
+            }
+            return latestVersion;
+        }
+    }
 }
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java
index 3884ea2ac14..497f9ff90dc 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.client.gateway.ClientResult;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ExecutorImpl;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.local.result.ChangelogCollectResult;
 import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
@@ -55,7 +56,13 @@ import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.MockedSqlGatewayService;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.rest.handler.session.OpenSessionHandler;
+import org.apache.flink.table.gateway.rest.handler.util.GetApiVersionHandler;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.rest.util.TestingSqlGatewayRestEndpoint;
 import org.apache.flink.table.gateway.service.context.DefaultContext;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import org.apache.flink.table.utils.UserDefinedFunctions;
@@ -77,6 +84,7 @@ import org.junit.jupiter.api.io.TempDir;
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
@@ -98,14 +106,18 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
 import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS;
 import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
 import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL;
 import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
+import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig;
+import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getFlinkConfig;
 import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
 import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Contains basic tests for the {@link ExecutorImpl}. */
 class ExecutorImplITCase {
@@ -519,10 +531,66 @@ class ExecutorImplITCase {
         }
     }
 
+    @Test
+    void testConnectToEndpointWithV1Version() throws Exception {
+        testNegotiateVersion(
+                SqlGatewayRestAPIVersion.V1,
+                executor ->
+                        assertThatThrownBy(() -> executor.openSession("connection-version"))
+                                .satisfies(
+                                        anyCauseMatches(
+                                                SqlExecutionException.class,
+                                                "Currently, SQL Client only supports to connect to the "
+                                                        + "REST endpoint with API version larger than V1.")));
+    }
+
+    @Test
+    void testConnectToEndpointWithHigherVersion() throws Exception {
+        testNegotiateVersion(
+                SqlGatewayRestAPIVersion.V2,
+                executor -> {
+                    executor.openSession("connection-version");
+                    assertThat(
+                                    SQL_GATEWAY_SERVICE_EXTENSION
+                                            .getService()
+                                            .getSessionEndpointVersion(
+                                                    ((ExecutorImpl) executor).getSessionHandle()))
+                            .isEqualTo(SqlGatewayRestAPIVersion.V2);
+                });
+    }
+
     // --------------------------------------------------------------------------------------------
     // Helper method
     // --------------------------------------------------------------------------------------------
 
+    private void testNegotiateVersion(
+            SqlGatewayRestAPIVersion version, Consumer<Executor> validator) throws Exception {
+        final String address = InetAddress.getLoopbackAddress().getHostAddress();
+        Configuration config = getBaseConfig(getFlinkConfig(address, address, "0"));
+        try (TestingSqlGatewayRestEndpoint endpoint =
+                        TestingSqlGatewayRestEndpoint.builder(
+                                        config, SQL_GATEWAY_SERVICE_EXTENSION.getService())
+                                .withHandler(
+                                        GetApiVersionHeaders.getInstance(),
+                                        new GetApiVersionHandler(
+                                                SQL_GATEWAY_SERVICE_EXTENSION.getService(),
+                                                Collections.emptyMap(),
+                                                GetApiVersionHeaders.getInstance(),
+                                                Collections.singletonList(version)))
+                                .withHandler(
+                                        OpenSessionHeaders.getInstance(),
+                                        new OpenSessionHandler(
+                                                SQL_GATEWAY_SERVICE_EXTENSION.getService(),
+                                                Collections.emptyMap(),
+                                                OpenSessionHeaders.getInstance()))
+                                .buildAndStart();
+                Executor executor =
+                        createExecutor(
+                                Collections.emptyList(), config, endpoint.getServerAddress())) {
+            validator.accept(executor);
+        }
+    }
+
     private void testInterrupting(Consumer<Executor> task) throws Exception {
         try (Executor executor = createTestServiceExecutor()) {
             Thread t = new Thread(() -> task.accept(executor), "worker");
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/util/GetApiVersionHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/util/GetApiVersionHandler.java
index afe46f3185d..9f9a372fb07 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/util/GetApiVersionHandler.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/util/GetApiVersionHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.gateway.rest.handler.util;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -30,7 +31,7 @@ import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
@@ -40,12 +41,29 @@ public class GetApiVersionHandler
         extends AbstractSqlGatewayRestHandler<
                 EmptyRequestBody, GetApiVersionResponseBody, EmptyMessageParameters> {
 
+    private final List<SqlGatewayRestAPIVersion> stableVersions;
+
     public GetApiVersionHandler(
             SqlGatewayService service,
             Map<String, String> responseHeaders,
             MessageHeaders<EmptyRequestBody, GetApiVersionResponseBody, EmptyMessageParameters>
                     messageHeaders) {
+        this(
+                service,
+                responseHeaders,
+                messageHeaders,
+                SqlGatewayRestAPIVersion.getStableVersions());
+    }
+
+    @VisibleForTesting
+    public GetApiVersionHandler(
+            SqlGatewayService service,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, GetApiVersionResponseBody, EmptyMessageParameters>
+                    messageHeaders,
+            List<SqlGatewayRestAPIVersion> stableVersions) {
         super(service, responseHeaders, messageHeaders);
+        this.stableVersions = stableVersions;
     }
 
     @Override
@@ -54,9 +72,6 @@ public class GetApiVersionHandler
             @Nonnull HandlerRequest<EmptyRequestBody> request) {
         return CompletableFuture.completedFuture(
                 new GetApiVersionResponseBody(
-                        Arrays.stream(SqlGatewayRestAPIVersion.values())
-                                .filter(SqlGatewayRestAPIVersion::isStableVersion)
-                                .map(Enum::name)
-                                .collect(Collectors.toList())));
+                        stableVersions.stream().map(Enum::name).collect(Collectors.toList())));
     }
 }
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
index 49399ed67d7..442b47fa2d7 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
@@ -85,11 +85,11 @@ public class FetchResultsHeaders
         if (version == V1) {
             return String.format(
                     "/%s/sessions/%s/operations/%s/result/%s",
-                    version, sessionId, operationId, nextToken);
+                    version.getURLVersionPrefix(), sessionId, operationId, nextToken);
         } else {
             return String.format(
                     "/%s/sessions/%s/operations/%s/result/%s?rowFormat=%s",
-                    version, sessionId, operationId, nextToken, rowFormat);
+                    version.getURLVersionPrefix(), sessionId, operationId, nextToken, rowFormat);
         }
     }
 
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
index 2b84aef323a..ff7f19f0220 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
@@ -107,6 +107,22 @@ public enum SqlGatewayRestAPIVersion
         }
     }
 
+    /**
+     * Returns the supported stable versions.
+     *
+     * @return the list of the stable versions.
+     */
+    public static List<SqlGatewayRestAPIVersion> getStableVersions() {
+        return Arrays.stream(SqlGatewayRestAPIVersion.values())
+                .filter(SqlGatewayRestAPIVersion::isStableVersion)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Returns the default version.
+     *
+     * @return the default version.
+     */
     public static SqlGatewayRestAPIVersion getDefaultVersion() {
         List<SqlGatewayRestAPIVersion> versions =
                 Arrays.stream(SqlGatewayRestAPIVersion.values())