You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2023/07/06 19:34:46 UTC
[flink] branch master updated: [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816)
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1354d2fae3f [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816)
1354d2fae3f is described below
commit 1354d2fae3fde2a448ce1fac5dee7859973a93e1
Author: Alexander Fedulov <14...@users.noreply.github.com>
AuthorDate: Thu Jul 6 21:34:36 2023 +0200
[FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816)
---
.../flink/configuration/ConfigConstants.java | 6 ++
.../org/apache/flink/runtime/rest/HttpHeader.java | 83 ++++++++++++++
.../org/apache/flink/runtime/rest/RestClient.java | 19 ++--
.../rest/messages/CustomHeadersDecorator.java | 120 +++++++++++++++++++++
.../runtime/rest/messages/MessageHeaders.java | 14 +++
.../flink/table/client/gateway/ExecutorImpl.java | 57 ++++++++--
.../table/client/gateway/ExecutorImplITCase.java | 19 ++++
7 files changed, 302 insertions(+), 16 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 3c7a89c9e00..a4968db4550 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1756,6 +1756,12 @@ public final class ConfigConstants {
/** The user lib directory name. */
public static final String DEFAULT_FLINK_USR_LIB_DIR = "usrlib";
+ /**
+ * The environment variable name which contains a list of newline-separated HTTP headers for
+ * Flink's REST client.
+ */
+ public static final String FLINK_REST_CLIENT_HEADERS = "FLINK_REST_CLIENT_HEADERS";
+
// ---------------------------- Encoding ------------------------------
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java
new file mode 100644
index 00000000000..06ee95bd451
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import java.util.Objects;
+
+/** Represents an HTTP header with a name and a value. */
+public class HttpHeader {
+
+ /** The name of the HTTP header. */
+ private final String name;
+
+ /** The value of the HTTP header. */
+ private final String value;
+
+ /**
+ * Constructs an {@code HttpHeader} object with the specified name and value.
+ *
+ * @param name the name of the HTTP header
+ * @param value the value of the HTTP header
+ */
+ public HttpHeader(String name, String value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ /**
+ * Returns the name of this HTTP header.
+ *
+ * @return the name of this HTTP header
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the value of this HTTP header.
+ *
+ * @return the value of this HTTP header
+ */
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "HttpHeader{" + "name='" + name + '\'' + ", value='" + value + '\'' + '}';
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ HttpHeader that = (HttpHeader) other;
+ return Objects.equals(getName(), that.getName())
+ && Objects.equals(getValue(), that.getValue());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getName(), getValue());
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2055a2227cf..ed399188db5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -69,6 +69,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRespon
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
@@ -384,7 +385,8 @@ public class RestClient implements AutoCloseableAsync {
targetUrl,
messageHeaders.getHttpMethod().getNettyHttpMethod(),
payload,
- fileUploads);
+ fileUploads,
+ messageHeaders.getCustomHeaders());
final JavaType responseType;
@@ -419,7 +421,8 @@ public class RestClient implements AutoCloseableAsync {
String targetUrl,
HttpMethod httpMethod,
ByteBuf jsonPayload,
- Collection<FileUpload> fileUploads)
+ Collection<FileUpload> fileUploads,
+ Collection<HttpHeader> customHeaders)
throws IOException {
if (fileUploads.isEmpty()) {
@@ -427,22 +430,22 @@ public class RestClient implements AutoCloseableAsync {
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, httpMethod, targetUrl, jsonPayload);
- httpRequest
- .headers()
- .set(HttpHeaderNames.HOST, targetAddress)
+ HttpHeaders headers = httpRequest.headers();
+ headers.set(HttpHeaderNames.HOST, targetAddress)
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
.add(HttpHeaderNames.CONTENT_LENGTH, jsonPayload.capacity())
.add(HttpHeaderNames.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
+ customHeaders.forEach(ch -> headers.set(ch.getName(), ch.getValue()));
return new SimpleRequest(httpRequest);
} else {
HttpRequest httpRequest =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, targetUrl);
- httpRequest
- .headers()
- .set(HttpHeaderNames.HOST, targetAddress)
+ HttpHeaders headers = httpRequest.headers();
+ headers.set(HttpHeaderNames.HOST, targetAddress)
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
+ customHeaders.forEach(ch -> headers.set(ch.getName(), ch.getValue()));
// takes care of splitting the request into multiple parts
HttpPostRequestEncoder bodyRequestEncoder;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
new file mode 100644
index 00000000000..979c849166c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpHeader;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Decorator class for {@link MessageHeaders} that adds the ability to include custom HTTP headers.
+ */
+public class CustomHeadersDecorator<
+ R extends RequestBody, P extends ResponseBody, M extends MessageParameters>
+ implements MessageHeaders<R, P, M> {
+
+ private final MessageHeaders<R, P, M> decorated;
+ private Collection<HttpHeader> customHeaders;
+
+ /**
+ * Creates a new {@code CustomHeadersDecorator} for a given {@link MessageHeaders} object.
+ *
+ * @param decorated The MessageHeaders to decorate.
+ */
+ public CustomHeadersDecorator(MessageHeaders<R, P, M> decorated) {
+ this.decorated = decorated;
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return decorated.getHttpMethod();
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return decorated.getTargetRestEndpointURL();
+ }
+
+ @Override
+ public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
+ return decorated.getSupportedAPIVersions();
+ }
+
+ @Override
+ public Class<P> getResponseClass() {
+ return decorated.getResponseClass();
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return decorated.getResponseStatusCode();
+ }
+
+ @Override
+ public String getDescription() {
+ return decorated.getDescription();
+ }
+
+ @Override
+ public Class<R> getRequestClass() {
+ return decorated.getRequestClass();
+ }
+
+ @Override
+ public M getUnresolvedMessageParameters() {
+ return decorated.getUnresolvedMessageParameters();
+ }
+
+ /**
+ * Returns the custom headers added to the message.
+ *
+ * @return The custom headers as a collection of {@link HttpHeader}.
+ */
+ @Override
+ public Collection<HttpHeader> getCustomHeaders() {
+ return customHeaders;
+ }
+
+ /**
+ * Sets the custom headers for the message.
+ *
+ * @param customHeaders A collection of custom headers.
+ */
+ public void setCustomHeaders(Collection<HttpHeader> customHeaders) {
+ this.customHeaders = customHeaders;
+ }
+
+ /**
+ * Adds a custom header to the message. Initializes the custom headers collection if it hasn't
+ * been initialized yet.
+ *
+ * @param httpHeader The header to add.
+ */
+ public void addCustomHeader(HttpHeader httpHeader) {
+ if (customHeaders == null) {
+ customHeaders = new ArrayList<>();
+ }
+ customHeaders.add(httpHeader);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
index a6c0d11d69b..63c54083493 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
@@ -18,8 +18,10 @@
package org.apache.flink.runtime.rest.messages;
+import org.apache.flink.runtime.rest.HttpHeader;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import java.util.Collection;
@@ -93,4 +95,16 @@ public interface MessageHeaders<
return getHttpMethod().name().toLowerCase(Locale.ROOT)
+ className.substring(0, headersSuffixStart);
}
+
+ /**
+ * Returns a collection of custom HTTP headers.
+ *
+ * <p>This default implementation returns an empty list. Override this method to provide custom
+ * headers if needed.
+ *
+ * @return a collection of custom {@link HttpHeaders}, empty by default.
+ */
+ default Collection<HttpHeader> getCustomHeaders() {
+ return Collections.emptyList();
+ }
}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index f4f6b40ed56..d755aa510ba 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -19,10 +19,13 @@
package org.apache.flink.table.client.gateway;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.AutoCloseableRegistry;
+import org.apache.flink.runtime.rest.HttpHeader;
import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -63,6 +66,7 @@ import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementReq
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import org.apache.flink.table.gateway.rest.message.util.GetApiVersionResponseBody;
import org.apache.flink.table.gateway.rest.serde.ResultInfo;
import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
@@ -80,6 +84,8 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -113,6 +119,7 @@ public class ExecutorImpl implements Executor {
private final SqlGatewayRestAPIVersion connectionVersion;
private final SessionHandle sessionHandle;
private final RowFormat rowFormat;
+ private final Collection<HttpHeader> customHttpHeaders;
public ExecutorImpl(
DefaultContext defaultContext, InetSocketAddress gatewayAddress, String sessionId) {
@@ -170,6 +177,8 @@ public class ExecutorImpl implements Executor {
this.registry = new AutoCloseableRegistry();
this.gatewayUrl = gatewayUrl;
this.rowFormat = rowFormat;
+ this.customHttpHeaders =
+ readHeadersFromEnvironmentVariable(ConfigConstants.FLINK_REST_CLIENT_HEADERS);
try {
// register required resource
this.executorService = Executors.newCachedThreadPool();
@@ -399,11 +408,12 @@ public class ExecutorImpl implements Executor {
P extends ResponseBody>
CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) {
Preconditions.checkNotNull(connectionVersion, "The connection version should not be null.");
- return sendRequest(
- new UrlPrefixDecorator<>(messageHeaders, gatewayUrl.getPath()),
- messageParameters,
- request,
- connectionVersion);
+ CustomHeadersDecorator<R, P, U> headers =
+ new CustomHeadersDecorator<>(
+ new UrlPrefixDecorator<>(messageHeaders, gatewayUrl.getPath()));
+ headers.setCustomHeaders(customHttpHeaders);
+
+ return sendRequest(headers, messageParameters, request, connectionVersion);
}
private <
@@ -512,14 +522,20 @@ public class ExecutorImpl implements Executor {
}
private SqlGatewayRestAPIVersion negotiateVersion() throws Exception {
+
+ CustomHeadersDecorator<EmptyRequestBody, GetApiVersionResponseBody, EmptyMessageParameters>
+ headers =
+ new CustomHeadersDecorator<>(
+ new UrlPrefixDecorator<>(
+ GetApiVersionHeaders.getInstance(), gatewayUrl.getPath()));
+ headers.setCustomHeaders(customHttpHeaders);
+
List<SqlGatewayRestAPIVersion> gatewayVersions =
getResponse(
restClient.sendRequest(
gatewayUrl.getHost(),
gatewayUrl.getPort(),
- new UrlPrefixDecorator<>(
- GetApiVersionHeaders.getInstance(),
- gatewayUrl.getPath()),
+ headers,
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance(),
Collections.emptyList(),
@@ -577,4 +593,29 @@ public class ExecutorImpl implements Executor {
// ignore any throwable to keep the cleanup running
}
}
+
+ private static Collection<HttpHeader> readHeadersFromEnvironmentVariable(String envVarName) {
+ List<HttpHeader> headers = new ArrayList<>();
+ String rawHeaders = System.getenv(envVarName);
+
+ if (rawHeaders != null) {
+ String[] lines = rawHeaders.split("\n");
+ for (String line : lines) {
+ String[] keyValue = line.split(":", 2);
+ if (keyValue.length == 2) {
+ headers.add(new HttpHeader(keyValue[0], keyValue[1]));
+ } else {
+ LOG.info(
+ "Skipped a malformed header {} from FLINK_REST_CLIENT_HEADERS env variable. Expecting newline-separated headers in format header_name:header_value.",
+ line);
+ }
+ }
+ }
+ return headers;
+ }
+
+ @VisibleForTesting
+ Collection<HttpHeader> getCustomHttpHeaders() {
+ return customHttpHeaders;
+ }
}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
index 8b4c7d24be9..14d4272fdf6 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.HttpHeader;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.DataTypes;
@@ -576,6 +577,24 @@ class ExecutorImplITCase {
.isFalse();
}
+ @Test
+ void testCustomHeadersSupport() {
+ final Map<String, String> envMap =
+ Collections.singletonMap(
+ ConfigConstants.FLINK_REST_CLIENT_HEADERS,
+ "Cookie:authCookie=12:345\nCustomHeader:value1,value2\nMalformedHeaderSkipped");
+ org.apache.flink.core.testutils.CommonTestUtils.setEnv(envMap);
+ try (final ExecutorImpl executor = (ExecutorImpl) createTestServiceExecutor()) {
+ final List<HttpHeader> customHttpHeaders =
+ new ArrayList<>(executor.getCustomHttpHeaders());
+ final HttpHeader expectedHeader1 = new HttpHeader("Cookie", "authCookie=12:345");
+ final HttpHeader expectedHeader2 = new HttpHeader("CustomHeader", "value1,value2");
+ assertThat(customHttpHeaders).hasSize(2);
+ assertThat(customHttpHeaders.get(0)).isEqualTo(expectedHeader1);
+ assertThat(customHttpHeaders.get(1)).isEqualTo(expectedHeader2);
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Helper method
// --------------------------------------------------------------------------------------------