You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/30 02:44:51 UTC
[flink] branch master updated: [FLINK-30816][sql-client] Fix SQL Client always uses the highest packaged version to connect to gateway
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cc6315a3e82 [FLINK-30816][sql-client] Fix SQL Client always uses the highest packaged version to connect to gateway
cc6315a3e82 is described below
commit cc6315a3e82c570e631684793d08ca9cb5403521
Author: Shengkai <10...@qq.com>
AuthorDate: Sun Jan 29 18:29:27 2023 +0800
[FLINK-30816][sql-client] Fix SQL Client always uses the highest packaged version to connect to gateway
This closes #21776
---
.../flink/table/client/gateway/ExecutorImpl.java | 85 +++++++++++++++++++---
.../client/gateway/local/ExecutorImplITCase.java | 68 +++++++++++++++++
.../rest/handler/util/GetApiVersionHandler.java | 25 +++++--
.../rest/header/statement/FetchResultsHeaders.java | 4 +-
.../rest/util/SqlGatewayRestAPIVersion.java | 16 ++++
5 files changed, 179 insertions(+), 19 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index 93e9550224e..f324f516406 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.table.api.SqlParserEOFException;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.data.RowData;
@@ -46,6 +47,7 @@ import org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbea
import org.apache.flink.table.gateway.rest.header.statement.CompleteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
+import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders;
import org.apache.flink.table.gateway.rest.message.operation.OperationMessageParameters;
import org.apache.flink.table.gateway.rest.message.operation.OperationStatusResponseBody;
import org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
@@ -61,9 +63,11 @@ import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessage
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.table.gateway.rest.serde.ResultInfo;
import org.apache.flink.table.gateway.rest.util.RowFormat;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointUtils;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +76,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -83,6 +88,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import static org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
@@ -97,9 +103,10 @@ public class ExecutorImpl implements Executor {
private final long heartbeatInterval;
private final ExecutorService service;
private final ScheduledExecutorService heartbeatScheduler;
+ private final RestClient restClient;
- private RestClient restClient;
private SessionHandle sessionHandle;
+ private SqlGatewayRestAPIVersion connectionVersion;
@VisibleForTesting
public ExecutorImpl(
@@ -111,6 +118,11 @@ public class ExecutorImpl implements Executor {
this.heartbeatInterval = heartbeatInterval;
this.service = Executors.newCachedThreadPool();
this.heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
+ try {
+ this.restClient = new RestClient(defaultContext.getFlinkConfig(), service);
+ } catch (Exception e) {
+ throw new SqlClientException("Can not create the Rest Client.", e);
+ }
}
public ExecutorImpl(DefaultContext defaultContext, InetSocketAddress gatewayAddress) {
@@ -119,21 +131,16 @@ public class ExecutorImpl implements Executor {
public void openSession(@Nullable String sessionId) {
try {
- restClient = new RestClient(defaultContext.getFlinkConfig(), service);
- } catch (Exception e) {
- throw new SqlExecutionException("Can not create the Rest Client.", e);
- }
-
- LOG.info("Open session to {}.", gatewayAddress);
- OpenSessionRequestBody request =
- new OpenSessionRequestBody(sessionId, defaultContext.getFlinkConfig().toMap());
- try {
+ // determine gateway rest api version
+ connectionVersion = negotiateVersion();
// open session to address:port
+ LOG.info("Open session to {}.", gatewayAddress);
OpenSessionResponseBody response =
sendRequest(
OpenSessionHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
- request)
+ new OpenSessionRequestBody(
+ sessionId, defaultContext.getFlinkConfig().toMap()))
.get();
sessionHandle = new SessionHandle(UUID.fromString(response.getSessionHandle()));
// register heartbeat service
@@ -339,13 +346,29 @@ public class ExecutorImpl implements Executor {
R extends RequestBody,
P extends ResponseBody>
CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) {
+ Preconditions.checkNotNull(connectionVersion, "The connection version should not be null.");
+ return sendRequest(messageHeaders, messageParameters, request, connectionVersion);
+ }
+
+ private <
+ M extends MessageHeaders<R, P, U>,
+ U extends MessageParameters,
+ R extends RequestBody,
+ P extends ResponseBody>
+ CompletableFuture<P> sendRequest(
+ M messageHeaders,
+ U messageParameters,
+ R request,
+ SqlGatewayRestAPIVersion connectionVersion) {
try {
return restClient.sendRequest(
gatewayAddress.getHostName(),
gatewayAddress.getPort(),
messageHeaders,
messageParameters,
- request);
+ request,
+ Collections.emptyList(),
+ connectionVersion);
} catch (IOException ioException) {
throw new SqlExecutionException("Failed to connect to the SQL Gateway.", ioException);
}
@@ -429,4 +452,42 @@ public class ExecutorImpl implements Executor {
private OperationHandle getOperationHandle(Supplier<String> handleSupplier) {
return new OperationHandle(UUID.fromString(handleSupplier.get()));
}
+
+ private SqlGatewayRestAPIVersion negotiateVersion() {
+ List<SqlGatewayRestAPIVersion> gatewayVersions =
+ getResponse(
+ sendRequest(
+ GetApiVersionHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ // Currently, RestClient always uses the latest REST API
+ // version to build the targetUrl. However, it's possible
+ // that the client REST API version is higher than the
+ // server REST API version. In this case, the gateway will
+ // report Not Found Error to notify the client.
+ //
+ // So, here use the lowest REST API version to get the
+ // remote gateway version list and then determine the
+ // connection version.
+ // TODO: Remove this after the REST Client should allow
+ // to build the target URL without API version.
+ Collections.min(
+ SqlGatewayRestAPIVersion.getStableVersions())))
+ .getVersions().stream()
+ .map(SqlGatewayRestAPIVersion::valueOf)
+ .collect(Collectors.toList());
+ SqlGatewayRestAPIVersion clientVersion = SqlGatewayRestAPIVersion.getDefaultVersion();
+
+ if (gatewayVersions.contains(clientVersion)) {
+ return clientVersion;
+ } else {
+ SqlGatewayRestAPIVersion latestVersion =
+ RestAPIVersion.getLatestVersion(gatewayVersions);
+ if (latestVersion.equals(SqlGatewayRestAPIVersion.V1)) {
+ throw new SqlExecutionException(
+ "Currently, SQL Client only supports to connect to the REST endpoint with API version larger than V1.");
+ }
+ return latestVersion;
+ }
+ }
}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java
index 3884ea2ac14..497f9ff90dc 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.client.gateway.ClientResult;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ExecutorImpl;
import org.apache.flink.table.client.gateway.ResultDescriptor;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.result.ChangelogCollectResult;
import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
@@ -55,7 +56,13 @@ import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedSqlGatewayService;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.rest.handler.session.OpenSessionHandler;
+import org.apache.flink.table.gateway.rest.handler.util.GetApiVersionHandler;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.rest.util.TestingSqlGatewayRestEndpoint;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.table.utils.UserDefinedFunctions;
@@ -77,6 +84,7 @@ import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nullable;
import java.io.File;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
@@ -98,14 +106,18 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS;
import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL;
import static org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
+import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig;
+import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getFlinkConfig;
import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Contains basic tests for the {@link ExecutorImpl}. */
class ExecutorImplITCase {
@@ -519,10 +531,66 @@ class ExecutorImplITCase {
}
}
+ @Test
+ void testConnectToEndpointWithV1Version() throws Exception {
+ testNegotiateVersion(
+ SqlGatewayRestAPIVersion.V1,
+ executor ->
+ assertThatThrownBy(() -> executor.openSession("connection-version"))
+ .satisfies(
+ anyCauseMatches(
+ SqlExecutionException.class,
+ "Currently, SQL Client only supports to connect to the "
+ + "REST endpoint with API version larger than V1.")));
+ }
+
+ @Test
+ void testConnectToEndpointWithHigherVersion() throws Exception {
+ testNegotiateVersion(
+ SqlGatewayRestAPIVersion.V2,
+ executor -> {
+ executor.openSession("connection-version");
+ assertThat(
+ SQL_GATEWAY_SERVICE_EXTENSION
+ .getService()
+ .getSessionEndpointVersion(
+ ((ExecutorImpl) executor).getSessionHandle()))
+ .isEqualTo(SqlGatewayRestAPIVersion.V2);
+ });
+ }
+
// --------------------------------------------------------------------------------------------
// Helper method
// --------------------------------------------------------------------------------------------
+ private void testNegotiateVersion(
+ SqlGatewayRestAPIVersion version, Consumer<Executor> validator) throws Exception {
+ final String address = InetAddress.getLoopbackAddress().getHostAddress();
+ Configuration config = getBaseConfig(getFlinkConfig(address, address, "0"));
+ try (TestingSqlGatewayRestEndpoint endpoint =
+ TestingSqlGatewayRestEndpoint.builder(
+ config, SQL_GATEWAY_SERVICE_EXTENSION.getService())
+ .withHandler(
+ GetApiVersionHeaders.getInstance(),
+ new GetApiVersionHandler(
+ SQL_GATEWAY_SERVICE_EXTENSION.getService(),
+ Collections.emptyMap(),
+ GetApiVersionHeaders.getInstance(),
+ Collections.singletonList(version)))
+ .withHandler(
+ OpenSessionHeaders.getInstance(),
+ new OpenSessionHandler(
+ SQL_GATEWAY_SERVICE_EXTENSION.getService(),
+ Collections.emptyMap(),
+ OpenSessionHeaders.getInstance()))
+ .buildAndStart();
+ Executor executor =
+ createExecutor(
+ Collections.emptyList(), config, endpoint.getServerAddress())) {
+ validator.accept(executor);
+ }
+ }
+
private void testInterrupting(Consumer<Executor> task) throws Exception {
try (Executor executor = createTestServiceExecutor()) {
Thread t = new Thread(() -> task.accept(executor), "worker");
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/util/GetApiVersionHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/util/GetApiVersionHandler.java
index afe46f3185d..9f9a372fb07 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/util/GetApiVersionHandler.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/util/GetApiVersionHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.gateway.rest.handler.util;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -30,7 +31,7 @@ import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -40,12 +41,29 @@ public class GetApiVersionHandler
extends AbstractSqlGatewayRestHandler<
EmptyRequestBody, GetApiVersionResponseBody, EmptyMessageParameters> {
+ private final List<SqlGatewayRestAPIVersion> stableVersions;
+
public GetApiVersionHandler(
SqlGatewayService service,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, GetApiVersionResponseBody, EmptyMessageParameters>
messageHeaders) {
+ this(
+ service,
+ responseHeaders,
+ messageHeaders,
+ SqlGatewayRestAPIVersion.getStableVersions());
+ }
+
+ @VisibleForTesting
+ public GetApiVersionHandler(
+ SqlGatewayService service,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, GetApiVersionResponseBody, EmptyMessageParameters>
+ messageHeaders,
+ List<SqlGatewayRestAPIVersion> stableVersions) {
super(service, responseHeaders, messageHeaders);
+ this.stableVersions = stableVersions;
}
@Override
@@ -54,9 +72,6 @@ public class GetApiVersionHandler
@Nonnull HandlerRequest<EmptyRequestBody> request) {
return CompletableFuture.completedFuture(
new GetApiVersionResponseBody(
- Arrays.stream(SqlGatewayRestAPIVersion.values())
- .filter(SqlGatewayRestAPIVersion::isStableVersion)
- .map(Enum::name)
- .collect(Collectors.toList())));
+ stableVersions.stream().map(Enum::name).collect(Collectors.toList())));
}
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
index 49399ed67d7..442b47fa2d7 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
@@ -85,11 +85,11 @@ public class FetchResultsHeaders
if (version == V1) {
return String.format(
"/%s/sessions/%s/operations/%s/result/%s",
- version, sessionId, operationId, nextToken);
+ version.getURLVersionPrefix(), sessionId, operationId, nextToken);
} else {
return String.format(
"/%s/sessions/%s/operations/%s/result/%s?rowFormat=%s",
- version, sessionId, operationId, nextToken, rowFormat);
+ version.getURLVersionPrefix(), sessionId, operationId, nextToken, rowFormat);
}
}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
index 2b84aef323a..ff7f19f0220 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
@@ -107,6 +107,22 @@ public enum SqlGatewayRestAPIVersion
}
}
+ /**
+ * Returns the supported stable versions.
+ *
+ * @return the list of the stable versions.
+ */
+ public static List<SqlGatewayRestAPIVersion> getStableVersions() {
+ return Arrays.stream(SqlGatewayRestAPIVersion.values())
+ .filter(SqlGatewayRestAPIVersion::isStableVersion)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Returns the default version.
+ *
+ * @return the default version.
+ */
public static SqlGatewayRestAPIVersion getDefaultVersion() {
List<SqlGatewayRestAPIVersion> versions =
Arrays.stream(SqlGatewayRestAPIVersion.values())