You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/09 04:14:23 UTC

[flink] branch master updated: [FLINK-28796][sql-gateway] Add completeStatement REST API in the SQL Gateway

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6301cca7a69 [FLINK-28796][sql-gateway] Add completeStatement REST API in the SQL Gateway
6301cca7a69 is described below

commit 6301cca7a6924cebd1107fcd39c063b7a091551d
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Mon Jan 9 12:14:10 2023 +0800

    [FLINK-28796][sql-gateway] Add completeStatement REST API in the SQL Gateway
    
    This closes #21526
---
 .../table/gateway/rest/SqlGatewayRestEndpoint.java |  7 ++
 .../statement/CompleteStatementHandler.java        | 67 +++++++++++++++
 .../header/statement/CompleteStatementHeaders.java | 95 ++++++++++++++++++++++
 .../statement/CompleteStatementRequestBody.java    | 52 ++++++++++++
 .../statement/CompleteStatementResponseBody.java   | 45 ++++++++++
 .../gateway/service/SqlGatewayServiceImpl.java     |  4 +-
 .../service/operation/OperationExecutor.java       |  4 +-
 .../table/gateway/service/utils/Constants.java     |  2 +-
 .../table/gateway/rest/StatementRelatedITCase.java | 69 ++++++++++++++++
 .../resources/sql_gateway_rest_api_v2.snapshot     | 37 +++++++++
 10 files changed, 377 insertions(+), 5 deletions(-)

diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
index 3d063d789ea..3d0114219fc 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.gateway.rest.handler.session.ConfigureSessionHandl
 import org.apache.flink.table.gateway.rest.handler.session.GetSessionConfigHandler;
 import org.apache.flink.table.gateway.rest.handler.session.OpenSessionHandler;
 import org.apache.flink.table.gateway.rest.handler.session.TriggerSessionHeartbeatHandler;
+import org.apache.flink.table.gateway.rest.handler.statement.CompleteStatementHandler;
 import org.apache.flink.table.gateway.rest.handler.statement.ExecuteStatementHandler;
 import org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler;
 import org.apache.flink.table.gateway.rest.handler.util.GetApiVersionHandler;
@@ -44,6 +45,7 @@ import org.apache.flink.table.gateway.rest.header.session.ConfigureSessionHeader
 import org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
 import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
 import org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
+import org.apache.flink.table.gateway.rest.header.statement.CompleteStatementHeaders;
 import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
 import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
 import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders;
@@ -152,6 +154,11 @@ public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGat
 
     private void addStatementRelatedHandlers(
             List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) {
+        // Complete a statement
+        CompleteStatementHandler completeStatementHandler =
+                new CompleteStatementHandler(
+                        service, responseHeaders, CompleteStatementHeaders.getINSTANCE());
+        handlers.add(Tuple2.of(CompleteStatementHeaders.getINSTANCE(), completeStatementHandler));
 
         // Execute a statement
         ExecuteStatementHandler executeStatementHandler =
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/CompleteStatementHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/CompleteStatementHandler.java
new file mode 100644
index 00000000000..09062c7f19e
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/CompleteStatementHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.handler.statement;
+
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
+import org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import org.apache.flink.table.gateway.rest.message.statement.CompleteStatementRequestBody;
+import org.apache.flink.table.gateway.rest.message.statement.CompleteStatementResponseBody;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler to complete a statement. */
+public class CompleteStatementHandler
+        extends AbstractSqlGatewayRestHandler<
+                CompleteStatementRequestBody,
+                CompleteStatementResponseBody,
+                SessionMessageParameters> {
+
+    public CompleteStatementHandler(
+            SqlGatewayService service,
+            Map<String, String> responseHeaders,
+            MessageHeaders<
+                            CompleteStatementRequestBody,
+                            CompleteStatementResponseBody,
+                            SessionMessageParameters>
+                    messageHeaders) {
+        super(service, responseHeaders, messageHeaders);
+    }
+
+    @Override
+    protected CompletableFuture<CompleteStatementResponseBody> handleRequest(
+            SqlGatewayRestAPIVersion version,
+            @Nonnull HandlerRequest<CompleteStatementRequestBody> request) {
+        SessionHandle sessionHandle = request.getPathParameter(SessionHandleIdPathParameter.class);
+        String statement = request.getRequestBody().getStatement();
+        int position = request.getRequestBody().getPosition();
+
+        return CompletableFuture.completedFuture(
+                new CompleteStatementResponseBody(
+                        service.completeStatement(sessionHandle, statement, position)));
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/CompleteStatementHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/CompleteStatementHeaders.java
new file mode 100644
index 00000000000..f2a62fb5286
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/CompleteStatementHeaders.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.header.statement;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders;
+import org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import org.apache.flink.table.gateway.rest.message.statement.CompleteStatementRequestBody;
+import org.apache.flink.table.gateway.rest.message.statement.CompleteStatementResponseBody;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/** Message headers for completing a statement. */
+public class CompleteStatementHeaders
+        implements SqlGatewayMessageHeaders<
+                CompleteStatementRequestBody,
+                CompleteStatementResponseBody,
+                SessionMessageParameters> {
+
+    private static final CompleteStatementHeaders INSTANCE = new CompleteStatementHeaders();
+
+    private static final String URL =
+            "/sessions/:" + SessionHandleIdPathParameter.KEY + "/complete-statement";
+
+    @Override
+    public Class<CompleteStatementResponseBody> getResponseClass() {
+        return CompleteStatementResponseBody.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Get the completion hints for the given statement at the given position.";
+    }
+
+    @Override
+    public Class<CompleteStatementRequestBody> getRequestClass() {
+        return CompleteStatementRequestBody.class;
+    }
+
+    @Override
+    public SessionMessageParameters getUnresolvedMessageParameters() {
+        return new SessionMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    @Override
+    public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
+        return Collections.singleton(SqlGatewayRestAPIVersion.V2);
+    }
+
+    public static CompleteStatementHeaders getINSTANCE() {
+        return INSTANCE;
+    }
+
+    @Override
+    public String operationId() {
+        return "completeStatement";
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/CompleteStatementRequestBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/CompleteStatementRequestBody.java
new file mode 100644
index 00000000000..51eb843ecf4
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/CompleteStatementRequestBody.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.statement;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** {@link RequestBody} for completing a statement. */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CompleteStatementRequestBody implements RequestBody {
+    private static final String FIELD_NAME_STATEMENT = "statement";
+    private static final String FIELD_NAME_POSITION = "position";
+
+    @JsonProperty(FIELD_NAME_STATEMENT)
+    private final String statement;
+
+    @JsonProperty(FIELD_NAME_POSITION)
+    private final int position;
+
+    public CompleteStatementRequestBody(
+            @JsonProperty(FIELD_NAME_STATEMENT) String statement,
+            @JsonProperty(FIELD_NAME_POSITION) int position) {
+        this.statement = statement;
+        this.position = position;
+    }
+
+    public String getStatement() {
+        return statement;
+    }
+
+    public int getPosition() {
+        return position;
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/CompleteStatementResponseBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/CompleteStatementResponseBody.java
new file mode 100644
index 00000000000..445d7d1e097
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/CompleteStatementResponseBody.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.statement;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** {@link ResponseBody} for completing a statement. */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CompleteStatementResponseBody implements ResponseBody {
+
+    private static final String FIELD_NAME_CANDIDATES = "candidates";
+
+    @JsonProperty(FIELD_NAME_CANDIDATES)
+    private final List<String> candidates;
+
+    public CompleteStatementResponseBody(
+            @JsonProperty(FIELD_NAME_CANDIDATES) List<String> candidates) {
+        this.candidates = candidates;
+    }
+
+    public List<String> getCandidates() {
+        return candidates;
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 6923c3615c1..b98cad69f12 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -361,8 +361,8 @@ public class SqlGatewayServiceImpl implements SqlGatewayService {
                     .map(StringData::toString)
                     .collect(Collectors.toList());
         } catch (Throwable t) {
-            LOG.error("Failed to get statement completion hints.", t);
-            throw new SqlGatewayException("Failed to get statement completion hints.", t);
+            LOG.error("Failed to get statement completion candidates.", t);
+            throw new SqlGatewayException("Failed to get statement completion candidates.", t);
         }
     }
 
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 5d11c6235d5..b57b245e9c9 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -87,7 +87,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.table.gateway.service.utils.Constants.COMPLETION_HINTS;
+import static org.apache.flink.table.gateway.service.utils.Constants.COMPLETION_CANDIDATES;
 import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID;
 import static org.apache.flink.table.gateway.service.utils.Constants.SAVEPOINT_PATH;
 import static org.apache.flink.table.gateway.service.utils.Constants.SET_KEY;
@@ -263,7 +263,7 @@ public class OperationExecutor {
         return new ResultSet(
                 ResultSet.ResultType.EOS,
                 null,
-                ResolvedSchema.of(Column.physical(COMPLETION_HINTS, DataTypes.STRING())),
+                ResolvedSchema.of(Column.physical(COMPLETION_CANDIDATES, DataTypes.STRING())),
                 Arrays.stream(
                                 getTableEnvironment()
                                         .getParser()
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
index 2f40d74b5b4..53ea12c8f6c 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java
@@ -24,7 +24,7 @@ public class Constants {
     public static final String JOB_ID = "job id";
     public static final String SET_KEY = "key";
     public static final String SET_VALUE = "value";
-    public static final String COMPLETION_HINTS = "hints";
+    public static final String COMPLETION_CANDIDATES = "candidates";
 
     public static final String SAVEPOINT_PATH = "savepoint_path";
 }
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/StatementRelatedITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/StatementRelatedITCase.java
new file mode 100644
index 00000000000..0fe21bf4cf1
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/StatementRelatedITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest;
+
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import org.apache.flink.table.gateway.rest.header.statement.CompleteStatementHeaders;
+import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
+import org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
+import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import org.apache.flink.table.gateway.rest.message.statement.CompleteStatementRequestBody;
+import org.apache.flink.table.gateway.rest.message.statement.CompleteStatementResponseBody;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test basic logic of handlers inherited from {@link AbstractSqlGatewayRestHandler} in statement
+ * related cases.
+ */
+public class StatementRelatedITCase extends RestAPIITCaseBase {
+
+    @Test
+    public void testCompleteStatement() throws Exception {
+        CompletableFuture<OpenSessionResponseBody> response =
+                sendRequest(
+                        OpenSessionHeaders.getInstance(),
+                        EmptyMessageParameters.getInstance(),
+                        new OpenSessionRequestBody(null, null));
+
+        SessionHandle sessionHandle =
+                new SessionHandle(UUID.fromString(response.get().getSessionHandle()));
+
+        SessionMessageParameters sessionMessageParameters =
+                new SessionMessageParameters(sessionHandle);
+
+        CompletableFuture<CompleteStatementResponseBody> completeStatementResponse =
+                sendRequest(
+                        CompleteStatementHeaders.getINSTANCE(),
+                        sessionMessageParameters,
+                        new CompleteStatementRequestBody("CREATE TA", 9));
+
+        assertThat(completeStatementResponse.get().getCandidates())
+                .isEqualTo(Collections.singletonList("TABLE"));
+    }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
index 1ed5ab30ab3..92824ac0c44 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
@@ -143,6 +143,43 @@
         }
       }
     }
+  }, {
+    "url" : "/sessions/:session_handle/complete-statement",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "session_handle"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:CompleteStatementRequestBody",
+      "properties" : {
+        "statement" : {
+          "type" : "string"
+        },
+        "position" : {
+          "type" : "integer"
+        }
+      }
+    },
+    "response" : {
+      "type" : "object",
+      "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:CompleteStatementResponseBody",
+      "properties" : {
+        "candidates" : {
+          "type" : "array",
+          "items" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
   }, {
     "url" : "/sessions/:session_handle/configure-session",
     "method" : "POST",