You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2023/07/06 19:34:46 UTC

[flink] branch master updated: [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816)

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1354d2fae3f [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816)
1354d2fae3f is described below

commit 1354d2fae3fde2a448ce1fac5dee7859973a93e1
Author: Alexander Fedulov <14...@users.noreply.github.com>
AuthorDate: Thu Jul 6 21:34:36 2023 +0200

    [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816)
---
 .../flink/configuration/ConfigConstants.java       |   6 ++
 .../org/apache/flink/runtime/rest/HttpHeader.java  |  83 ++++++++++++++
 .../org/apache/flink/runtime/rest/RestClient.java  |  19 ++--
 .../rest/messages/CustomHeadersDecorator.java      | 120 +++++++++++++++++++++
 .../runtime/rest/messages/MessageHeaders.java      |  14 +++
 .../flink/table/client/gateway/ExecutorImpl.java   |  57 ++++++++--
 .../table/client/gateway/ExecutorImplITCase.java   |  19 ++++
 7 files changed, 302 insertions(+), 16 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 3c7a89c9e00..a4968db4550 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1756,6 +1756,12 @@ public final class ConfigConstants {
     /** The user lib directory name. */
     public static final String DEFAULT_FLINK_USR_LIB_DIR = "usrlib";
 
+    /**
+     * The environment variable name which contains a list of newline-separated HTTP headers for
+     * Flink's REST client.
+     */
+    public static final String FLINK_REST_CLIENT_HEADERS = "FLINK_REST_CLIENT_HEADERS";
+
     // ---------------------------- Encoding ------------------------------
 
     public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java
new file mode 100644
index 00000000000..06ee95bd451
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import java.util.Objects;
+
+/** Represents an HTTP header with a name and a value. */
+public class HttpHeader {
+
+    /** The name of the HTTP header. */
+    private final String name;
+
+    /** The value of the HTTP header. */
+    private final String value;
+
+    /**
+     * Constructs an {@code HttpHeader} object with the specified name and value.
+     *
+     * @param name the name of the HTTP header
+     * @param value the value of the HTTP header
+     */
+    public HttpHeader(String name, String value) {
+        this.name = name;
+        this.value = value;
+    }
+
+    /**
+     * Returns the name of this HTTP header.
+     *
+     * @return the name of this HTTP header
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Returns the value of this HTTP header.
+     *
+     * @return the value of this HTTP header
+     */
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String toString() {
+        return "HttpHeader{" + "name='" + name + '\'' + ", value='" + value + '\'' + '}';
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+        HttpHeader that = (HttpHeader) other;
+        return Objects.equals(getName(), that.getName())
+                && Objects.equals(getValue(), that.getValue());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(getName(), getValue());
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 2055a2227cf..ed399188db5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -69,6 +69,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRespon
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
@@ -384,7 +385,8 @@ public class RestClient implements AutoCloseableAsync {
                         targetUrl,
                         messageHeaders.getHttpMethod().getNettyHttpMethod(),
                         payload,
-                        fileUploads);
+                        fileUploads,
+                        messageHeaders.getCustomHeaders());
 
         final JavaType responseType;
 
@@ -419,7 +421,8 @@ public class RestClient implements AutoCloseableAsync {
             String targetUrl,
             HttpMethod httpMethod,
             ByteBuf jsonPayload,
-            Collection<FileUpload> fileUploads)
+            Collection<FileUpload> fileUploads,
+            Collection<HttpHeader> customHeaders)
             throws IOException {
         if (fileUploads.isEmpty()) {
 
@@ -427,22 +430,22 @@ public class RestClient implements AutoCloseableAsync {
                     new DefaultFullHttpRequest(
                             HttpVersion.HTTP_1_1, httpMethod, targetUrl, jsonPayload);
 
-            httpRequest
-                    .headers()
-                    .set(HttpHeaderNames.HOST, targetAddress)
+            HttpHeaders headers = httpRequest.headers();
+            headers.set(HttpHeaderNames.HOST, targetAddress)
                     .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
                     .add(HttpHeaderNames.CONTENT_LENGTH, jsonPayload.capacity())
                     .add(HttpHeaderNames.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
+            customHeaders.forEach(ch -> headers.set(ch.getName(), ch.getValue()));
 
             return new SimpleRequest(httpRequest);
         } else {
             HttpRequest httpRequest =
                     new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, targetUrl);
 
-            httpRequest
-                    .headers()
-                    .set(HttpHeaderNames.HOST, targetAddress)
+            HttpHeaders headers = httpRequest.headers();
+            headers.set(HttpHeaderNames.HOST, targetAddress)
                     .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
+            customHeaders.forEach(ch -> headers.set(ch.getName(), ch.getValue()));
 
             // takes care of splitting the request into multiple parts
             HttpPostRequestEncoder bodyRequestEncoder;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
new file mode 100644
index 00000000000..979c849166c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpHeader;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Decorator class for {@link MessageHeaders} that adds the ability to include custom HTTP headers.
+ */
+public class CustomHeadersDecorator<
+                R extends RequestBody, P extends ResponseBody, M extends MessageParameters>
+        implements MessageHeaders<R, P, M> {
+
+    private final MessageHeaders<R, P, M> decorated;
+    private Collection<HttpHeader> customHeaders;
+
+    /**
+     * Creates a new {@code CustomHeadersDecorator} for a given {@link MessageHeaders} object.
+     *
+     * @param decorated The MessageHeaders to decorate.
+     */
+    public CustomHeadersDecorator(MessageHeaders<R, P, M> decorated) {
+        this.decorated = decorated;
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return decorated.getHttpMethod();
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return decorated.getTargetRestEndpointURL();
+    }
+
+    @Override
+    public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
+        return decorated.getSupportedAPIVersions();
+    }
+
+    @Override
+    public Class<P> getResponseClass() {
+        return decorated.getResponseClass();
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return decorated.getResponseStatusCode();
+    }
+
+    @Override
+    public String getDescription() {
+        return decorated.getDescription();
+    }
+
+    @Override
+    public Class<R> getRequestClass() {
+        return decorated.getRequestClass();
+    }
+
+    @Override
+    public M getUnresolvedMessageParameters() {
+        return decorated.getUnresolvedMessageParameters();
+    }
+
+    /**
+     * Returns the custom headers added to the message.
+     *
+     * @return The custom headers as a collection of {@link HttpHeader}.
+     */
+    @Override
+    public Collection<HttpHeader> getCustomHeaders() {
+        return customHeaders;
+    }
+
+    /**
+     * Sets the custom headers for the message.
+     *
+     * @param customHeaders A collection of custom headers.
+     */
+    public void setCustomHeaders(Collection<HttpHeader> customHeaders) {
+        this.customHeaders = customHeaders;
+    }
+
+    /**
+     * Adds a custom header to the message. Initializes the custom headers collection if it hasn't
+     * been initialized yet.
+     *
+     * @param httpHeader The header to add.
+     */
+    public void addCustomHeader(HttpHeader httpHeader) {
+        if (customHeaders == null) {
+            customHeaders = new ArrayList<>();
+        }
+        customHeaders.add(httpHeader);
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
index a6c0d11d69b..63c54083493 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.rest.messages;
 
+import org.apache.flink.runtime.rest.HttpHeader;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import java.util.Collection;
@@ -93,4 +95,16 @@ public interface MessageHeaders<
         return getHttpMethod().name().toLowerCase(Locale.ROOT)
                 + className.substring(0, headersSuffixStart);
     }
+
+    /**
+     * Returns a collection of custom HTTP headers.
+     *
+     * <p>This default implementation returns an empty list. Override this method to provide custom
+     * headers if needed.
+     *
+     * @return a collection of custom {@link HttpHeaders}, empty by default.
+     */
+    default Collection<HttpHeader> getCustomHeaders() {
+        return Collections.emptyList();
+    }
 }
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index f4f6b40ed56..d755aa510ba 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -19,10 +19,13 @@
 package org.apache.flink.table.client.gateway;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.AutoCloseableRegistry;
+import org.apache.flink.runtime.rest.HttpHeader;
 import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -63,6 +66,7 @@ import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementReq
 import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
 import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
 import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import org.apache.flink.table.gateway.rest.message.util.GetApiVersionResponseBody;
 import org.apache.flink.table.gateway.rest.serde.ResultInfo;
 import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
@@ -80,6 +84,8 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -113,6 +119,7 @@ public class ExecutorImpl implements Executor {
     private final SqlGatewayRestAPIVersion connectionVersion;
     private final SessionHandle sessionHandle;
     private final RowFormat rowFormat;
+    private final Collection<HttpHeader> customHttpHeaders;
 
     public ExecutorImpl(
             DefaultContext defaultContext, InetSocketAddress gatewayAddress, String sessionId) {
@@ -170,6 +177,8 @@ public class ExecutorImpl implements Executor {
         this.registry = new AutoCloseableRegistry();
         this.gatewayUrl = gatewayUrl;
         this.rowFormat = rowFormat;
+        this.customHttpHeaders =
+                readHeadersFromEnvironmentVariable(ConfigConstants.FLINK_REST_CLIENT_HEADERS);
         try {
             // register required resource
             this.executorService = Executors.newCachedThreadPool();
@@ -399,11 +408,12 @@ public class ExecutorImpl implements Executor {
                     P extends ResponseBody>
             CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) {
         Preconditions.checkNotNull(connectionVersion, "The connection version should not be null.");
-        return sendRequest(
-                new UrlPrefixDecorator<>(messageHeaders, gatewayUrl.getPath()),
-                messageParameters,
-                request,
-                connectionVersion);
+        CustomHeadersDecorator<R, P, U> headers =
+                new CustomHeadersDecorator<>(
+                        new UrlPrefixDecorator<>(messageHeaders, gatewayUrl.getPath()));
+        headers.setCustomHeaders(customHttpHeaders);
+
+        return sendRequest(headers, messageParameters, request, connectionVersion);
     }
 
     private <
@@ -512,14 +522,20 @@ public class ExecutorImpl implements Executor {
     }
 
     private SqlGatewayRestAPIVersion negotiateVersion() throws Exception {
+
+        CustomHeadersDecorator<EmptyRequestBody, GetApiVersionResponseBody, EmptyMessageParameters>
+                headers =
+                        new CustomHeadersDecorator<>(
+                                new UrlPrefixDecorator<>(
+                                        GetApiVersionHeaders.getInstance(), gatewayUrl.getPath()));
+        headers.setCustomHeaders(customHttpHeaders);
+
         List<SqlGatewayRestAPIVersion> gatewayVersions =
                 getResponse(
                                 restClient.sendRequest(
                                         gatewayUrl.getHost(),
                                         gatewayUrl.getPort(),
-                                        new UrlPrefixDecorator<>(
-                                                GetApiVersionHeaders.getInstance(),
-                                                gatewayUrl.getPath()),
+                                        headers,
                                         EmptyMessageParameters.getInstance(),
                                         EmptyRequestBody.getInstance(),
                                         Collections.emptyList(),
@@ -577,4 +593,29 @@ public class ExecutorImpl implements Executor {
             // ignore any throwable to keep the cleanup running
         }
     }
+
+    private static Collection<HttpHeader> readHeadersFromEnvironmentVariable(String envVarName) {
+        List<HttpHeader> headers = new ArrayList<>();
+        String rawHeaders = System.getenv(envVarName);
+
+        if (rawHeaders != null) {
+            String[] lines = rawHeaders.split("\n");
+            for (String line : lines) {
+                String[] keyValue = line.split(":", 2);
+                if (keyValue.length == 2) {
+                    headers.add(new HttpHeader(keyValue[0], keyValue[1]));
+                } else {
+                    LOG.info(
+                            "Skipped a malformed header {} from FLINK_REST_CLIENT_HEADERS env variable. Expecting newline-separated headers in format header_name:header_value.",
+                            line);
+                }
+            }
+        }
+        return headers;
+    }
+
+    @VisibleForTesting
+    Collection<HttpHeader> getCustomHttpHeaders() {
+        return customHttpHeaders;
+    }
 }
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
index 8b4c7d24be9..14d4272fdf6 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.HttpHeader;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.DataTypes;
@@ -576,6 +577,24 @@ class ExecutorImplITCase {
                 .isFalse();
     }
 
+    @Test
+    void testCustomHeadersSupport() {
+        final Map<String, String> envMap =
+                Collections.singletonMap(
+                        ConfigConstants.FLINK_REST_CLIENT_HEADERS,
+                        "Cookie:authCookie=12:345\nCustomHeader:value1,value2\nMalformedHeaderSkipped");
+        org.apache.flink.core.testutils.CommonTestUtils.setEnv(envMap);
+        try (final ExecutorImpl executor = (ExecutorImpl) createTestServiceExecutor()) {
+            final List<HttpHeader> customHttpHeaders =
+                    new ArrayList<>(executor.getCustomHttpHeaders());
+            final HttpHeader expectedHeader1 = new HttpHeader("Cookie", "authCookie=12:345");
+            final HttpHeader expectedHeader2 = new HttpHeader("CustomHeader", "value1,value2");
+            assertThat(customHttpHeaders).hasSize(2);
+            assertThat(customHttpHeaders.get(0)).isEqualTo(expectedHeader1);
+            assertThat(customHttpHeaders.get(1)).isEqualTo(expectedHeader2);
+        }
+    }
+
     // --------------------------------------------------------------------------------------------
     // Helper method
     // --------------------------------------------------------------------------------------------