You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/13 10:28:01 UTC

[GitHub] [flink] yuzelin opened a new pull request, #21502: [WIP] ResultSet Modification.

yuzelin opened a new pull request, #21502:
URL: https://github.com/apache/flink/pull/21502

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
     - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact*
     - *Deployments RPC transmits only the blob storage reference*
     - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #21502: [FLINK-29950] Refactor ResultSet to an interface

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #21502:
URL: https://github.com/apache/flink/pull/21502#discussion_r1066514320


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -506,24 +521,16 @@ public ResultFetcher callStopJobOperation(
                     "Could not stop job " + jobId + " for operation " + handle + ".", e);
         }
         if (isWithSavepoint) {
-            return new ResultFetcher(
+            return ResultFetcher.fromResults(
                     handle,
                     ResolvedSchema.of(Column.physical(SAVEPOINT_PATH, DataTypes.STRING())),
                     Collections.singletonList(
                             GenericRowData.of(StringData.fromString(savepoint.orElse("")))));
         } else {
-            return buildOkResultFetcher(handle);
+            return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false, null);

Review Comment:
   replace with `return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);`



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -50,41 +59,105 @@ public class ResultFetcher {
 
     private static final Logger LOG = LoggerFactory.getLogger(ResultFetcher.class);
     private static final int TABLE_RESULT_MAX_INITIAL_CAPACITY = 5000;
+    private static final RowDataToStringConverter DEFAULT_CONVERTER =
+            SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
 
     private final OperationHandle operationHandle;
 
     private final ResolvedSchema resultSchema;
     private final ResultStore resultStore;
     private final LinkedList<RowData> bufferedResults = new LinkedList<>();
     private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>();
+    private final RowDataToStringConverter converter;
+
+    private final boolean isQueryResult;
+
+    @Nullable private final JobID jobID;
+
+    private final ResultKind resultKind;
 
     private long currentToken = 0;
     private boolean noMoreResults = false;
 
-    public ResultFetcher(
+    private ResultFetcher(
             OperationHandle operationHandle,
             ResolvedSchema resultSchema,
-            CloseableIterator<RowData> resultRows) {
-        this(operationHandle, resultSchema, resultRows, TABLE_RESULT_MAX_INITIAL_CAPACITY);
+            CloseableIterator<RowData> resultRows,
+            RowDataToStringConverter converter,
+            boolean isQueryResult,
+            @Nullable JobID jobID,
+            ResultKind resultKind) {
+        this(
+                operationHandle,
+                resultSchema,
+                resultRows,
+                converter,
+                isQueryResult,
+                jobID,
+                resultKind,
+                TABLE_RESULT_MAX_INITIAL_CAPACITY);
     }
 
     @VisibleForTesting
     ResultFetcher(
             OperationHandle operationHandle,
             ResolvedSchema resultSchema,
             CloseableIterator<RowData> resultRows,
+            RowDataToStringConverter converter,
+            boolean isQueryResult,
+            @Nullable JobID jobID,
+            ResultKind resultKind,
             int maxBufferSize) {
         this.operationHandle = operationHandle;
         this.resultSchema = resultSchema;
         this.resultStore = new ResultStore(resultRows, maxBufferSize);
+        this.converter = converter;
+        this.isQueryResult = isQueryResult;
+        this.jobID = jobID;
+        this.resultKind = resultKind;
     }
 
-    public ResultFetcher(
-            OperationHandle operationHandle, ResolvedSchema resultSchema, List<RowData> rows) {
+    private ResultFetcher(
+            OperationHandle operationHandle,
+            ResolvedSchema resultSchema,
+            List<RowData> rows,
+            @Nullable JobID jobID) {
         this.operationHandle = operationHandle;
         this.resultSchema = resultSchema;
         this.bufferedResults.addAll(rows);
         this.resultStore = ResultStore.DUMMY_RESULT_STORE;
+        this.converter = DEFAULT_CONVERTER;
+        this.isQueryResult = false;
+        this.jobID = jobID;
+        this.resultKind = ResultKind.SUCCESS_WITH_CONTENT;
+    }
+
+    public static ResultFetcher fromTableResult(
+            OperationHandle operationHandle,
+            TableResultInternal tableResult,
+            boolean isQueryResult,
+            @Nullable JobID jobID) {

Review Comment:
   I think jobId is in the `TableResult` and we can remove this.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -220,4 +283,26 @@ public synchronized ResultSet fetchResults(long token, int maxFetchSize) {
     public ResultStore getResultStore() {
         return resultStore;
     }
+
+    private ResultSet buildEOSResultSet() {
+        return ResultSetImpl.newBuilder()
+                .resultType(ResultSet.ResultType.EOS)
+                .nextToken(null)
+                .resolvedSchema(resultSchema)
+                .data(Collections.emptyList())
+                .build();
+    }
+
+    private ResultSet buildPayloadResultSet() {

Review Comment:
   nit: BTW, I think these two methods should belong to the `ResultSetImpl`. WDYT?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -45,12 +46,12 @@
 import java.util.function.Function;
 import java.util.function.Supplier;
 
-import static org.apache.flink.table.gateway.api.results.ResultSet.NOT_READY_RESULTS;
-
 /** Manager for the {@link Operation}. */
 @Internal
 public class OperationManager {
 
+    public static final NotReadyResult NOT_READY_RESULT = new NotReadyResult();

Review Comment:
   Move this to the NotReadyResult.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+
+import java.util.Collections;
+import java.util.List;
+
+/** To represent that the execution result is not ready to fetch. */
+public class NotReadyResult implements ResultSet {
+
+    @Override
+    public ResultType getResultType() {
+        return ResultType.NOT_READY;
+    }
+
+    @Override
+    public Long getNextToken() {
+        return 0L;
+    }
+
+    @Override
+    public ResolvedSchema getResultSchema() {
+        return ResolvedSchema.of(Collections.emptyList());
+    }
+
+    @Override
+    public List<RowData> getData() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isQueryResult() {
+        throw new UnsupportedOperationException(
+                "Can't know whether a NOT_READY_RESULT is for a query.");

Review Comment:
   Can't -> Don't
   
   Also add "Please continue fetching results until the result type is PAYLOAD or EOS."



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+
+import java.util.Collections;
+import java.util.List;
+
+/** To represent that the execution result is not ready to fetch. */
+public class NotReadyResult implements ResultSet {
+
+    @Override
+    public ResultType getResultType() {
+        return ResultType.NOT_READY;
+    }
+
+    @Override
+    public Long getNextToken() {
+        return 0L;
+    }
+
+    @Override
+    public ResolvedSchema getResultSchema() {
+        return ResolvedSchema.of(Collections.emptyList());
+    }
+
+    @Override
+    public List<RowData> getData() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isQueryResult() {
+        throw new UnsupportedOperationException(
+                "Can't know whether a NOT_READY_RESULT is for a query.");
+    }
+
+    @Override
+    public JobID getJobID() {
+        throw new UnsupportedOperationException("Can't get job ID from a NOT_READY_RESULT.");
+    }
+
+    @Override
+    public ResultKind getResultKind() {
+        throw new UnsupportedOperationException("Can't get result kind from a NOT_READY_RESULT.");

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #21502: [FLINK-29950] Refactor ResultSet to an interface

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #21502:
URL: https://github.com/apache/flink/pull/21502#discussion_r1064355032


##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;

Review Comment:
   I don't think we should expose the implementation in the sql-gateway-api package. Move to the `org.apache.flink.table.gateway.service.result`



##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** An implementation of {@link ResultSet}. */
+@Internal
+public class ResultSetImpl implements ResultSet {
+
+    private final ResultType resultType;
+
+    @Nullable private final Long nextToken;
+
+    private final ResolvedSchema resultSchema;
+    private final List<RowData> data;
+    @Nullable private final RowDataToStringConverter converter;

Review Comment:
   Why this Nulllable? I think it's not null in the EOS/PAYLOAD case.



##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** An implementation of {@link ResultSet}. */
+@Internal
+public class ResultSetImpl implements ResultSet {
+
+    private final ResultType resultType;
+
+    @Nullable private final Long nextToken;
+
+    private final ResolvedSchema resultSchema;
+    private final List<RowData> data;
+    @Nullable private final RowDataToStringConverter converter;
+
+    private final boolean isQueryResult;
+
+    @Nullable private final JobID jobID;
+
+    @Nullable private final ResultKind resultKind;
+
+    public static final ResultSet NOT_READY_RESULTS =

Review Comment:
   It's a little strange that we can get jobId or ResultKind from the `NOT_READY_RESULTS`. I think we should introduce an individual class to represent this and throw exception if others try to get the results.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -386,30 +414,51 @@ private ResultFetcher callModifyOperations(
             OperationHandle handle,
             List<ModifyOperation> modifyOperations) {
         TableResultInternal result = tableEnv.executeInternal(modifyOperations);
-        return new ResultFetcher(
-                handle,
-                ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
-                Collections.singletonList(
-                        GenericRowData.of(
-                                StringData.fromString(
-                                        result.getJobClient()
-                                                .orElseThrow(
-                                                        () ->
-                                                                new SqlExecutionException(
-                                                                        String.format(
-                                                                                "Can't get job client for the operation %s.",
-                                                                                handle)))
-                                                .getJobID()
-                                                .toString()))));
+        return ResultFetcher.newBuilder()
+                .operationHandle(handle)
+                .resolvedSchema(ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())))
+                .rows(
+                        Collections.singletonList(
+                                GenericRowData.of(
+                                        StringData.fromString(
+                                                result.getJobClient()
+                                                        .orElseThrow(
+                                                                () ->
+                                                                        new SqlExecutionException(
+                                                                                String.format(
+                                                                                        "Can't get job client for the operation %s.",
+                                                                                        handle)))
+                                                        .getJobID()
+                                                        .toString()))))
+                .converter(SIMPLE_ROW_DATA_TO_STRING_CONVERTER)
+                .jobID(
+                        result.getJobClient()
+                                .orElseThrow(
+                                        () ->
+                                                new SqlExecutionException(
+                                                        String.format(
+                                                                "Can't get job client for the operation %s.",
+                                                                handle)))
+                                .getJobID())
+                .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+                .build();
     }
 
     private ResultFetcher callOperation(
             TableEnvironmentInternal tableEnv, OperationHandle handle, Operation op) {
         TableResultInternal result = tableEnv.executeInternal(op);
-        return new ResultFetcher(
-                handle,
-                result.getResolvedSchema(),
-                CollectionUtil.iteratorToList(result.collectInternal()));
+        JobID jobID = null;
+        if (result.getJobClient().isPresent()) {
+            jobID = result.getJobClient().get().getJobID();
+        }

Review Comment:
   Is it possible to satisfy the condition? It seems we have different paths to process ModifyOperation and QueryOperation



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -456,6 +450,100 @@ public void testGetOperationSchemaUntilOperationIsReady() throws Exception {
                 task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema()));
     }
 
+    // ---------------------------------------------------------------------------------------------
+    // Executing statement tests
+    // ---------------------------------------------------------------------------------------------

Review Comment:
   I am prone to move the tests to the `SqlGatewayServiceStatementITCase` side.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -220,4 +223,111 @@ public synchronized ResultSet fetchResults(long token, int maxFetchSize) {
     public ResultStore getResultStore() {
         return resultStore;
     }
+
+    private ResultSet buildEosResultSet() {

Review Comment:
   nit: buildEosResultSet -> buildEOSResultSet



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -220,4 +223,111 @@ public synchronized ResultSet fetchResults(long token, int maxFetchSize) {
     public ResultStore getResultStore() {
         return resultStore;
     }
+
+    private ResultSet buildEosResultSet() {
+        return ResultSetImpl.newBuilder()
+                .resultType(ResultSet.ResultType.EOS)
+                .nextToken(null)
+                .resolvedSchema(resultSchema)
+                .data(Collections.emptyList())
+                .build();
+    }
+
+    private ResultSet buildPayloadResultSet() {
+        return ResultSetImpl.newBuilder()
+                .resultType(ResultSet.ResultType.PAYLOAD)
+                .nextToken(currentToken)
+                .resolvedSchema(resultSchema)
+                .data(new ArrayList<>(bufferedPrevResults))
+                .converter(converter)
+                .isQueryResult(isQueryResult)
+                .jobID(jobID)
+                .resultKind(resultKind)
+                .build();
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /** Builder to build the {@link ResultFetcher}. */
+    public static class Builder {
+        private OperationHandle operationHandle;
+        private ResolvedSchema resultSchema;
+        private List<RowData> rows;
+        private CloseableIterator<RowData> rowsIterator;
+        RowDataToStringConverter converter;
+        private boolean isQueryResult = false;
+        @Nullable private JobID jobID;
+        private ResultKind resultKind;
+
+        public Builder operationHandle(OperationHandle operationHandle) {
+            this.operationHandle = operationHandle;
+            return this;
+        }
+
+        public Builder resolvedSchema(ResolvedSchema resultSchema) {
+            this.resultSchema = resultSchema;
+            return this;
+        }
+
+        public Builder rows(List<RowData> rows) {
+            Preconditions.checkState(
+                    rowsIterator == null,
+                    "Result data has been set already. Can only set either rows or rowsIterator");
+            this.rows = rows;
+            return this;
+        }
+
+        public Builder rowsIterator(CloseableIterator<RowData> rowsIterator) {
+            Preconditions.checkState(
+                    rows == null,
+                    "Result data has been set already. Can only set either rows or rowsIterator");
+            this.rowsIterator = rowsIterator;
+            return this;
+        }
+
+        public Builder converter(RowDataToStringConverter converter) {
+            this.converter = converter;
+            return this;
+        }
+
+        public Builder setIsQueryResult() {

Review Comment:
   setIsQueryResult -> queryResult



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -54,37 +61,43 @@ public class ResultFetcher {
     private final OperationHandle operationHandle;
 
     private final ResolvedSchema resultSchema;
-    private final ResultStore resultStore;
+    private ResultStore resultStore;

Review Comment:
   Why this is not final?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -386,30 +414,51 @@ private ResultFetcher callModifyOperations(
             OperationHandle handle,
             List<ModifyOperation> modifyOperations) {
         TableResultInternal result = tableEnv.executeInternal(modifyOperations);
-        return new ResultFetcher(
-                handle,
-                ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
-                Collections.singletonList(
-                        GenericRowData.of(
-                                StringData.fromString(
-                                        result.getJobClient()
-                                                .orElseThrow(
-                                                        () ->
-                                                                new SqlExecutionException(
-                                                                        String.format(
-                                                                                "Can't get job client for the operation %s.",
-                                                                                handle)))
-                                                .getJobID()
-                                                .toString()))));
+        return ResultFetcher.newBuilder()
+                .operationHandle(handle)
+                .resolvedSchema(ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())))
+                .rows(
+                        Collections.singletonList(
+                                GenericRowData.of(
+                                        StringData.fromString(
+                                                result.getJobClient()
+                                                        .orElseThrow(
+                                                                () ->
+                                                                        new SqlExecutionException(
+                                                                                String.format(
+                                                                                        "Can't get job client for the operation %s.",
+                                                                                        handle)))
+                                                        .getJobID()
+                                                        .toString()))))
+                .converter(SIMPLE_ROW_DATA_TO_STRING_CONVERTER)
+                .jobID(
+                        result.getJobClient()
+                                .orElseThrow(
+                                        () ->
+                                                new SqlExecutionException(
+                                                        String.format(
+                                                                "Can't get job client for the operation %s.",
+                                                                handle)))
+                                .getJobID())
+                .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+                .build();
     }
 
     private ResultFetcher callOperation(
             TableEnvironmentInternal tableEnv, OperationHandle handle, Operation op) {
         TableResultInternal result = tableEnv.executeInternal(op);
-        return new ResultFetcher(
-                handle,
-                result.getResolvedSchema(),
-                CollectionUtil.iteratorToList(result.collectInternal()));
+        JobID jobID = null;
+        if (result.getJobClient().isPresent()) {
+            jobID = result.getJobClient().get().getJobID();
+        }
+        return ResultFetcher.newBuilder()
+                .operationHandle(handle)

Review Comment:
   I think most codes are similar here? What about we introducing a method named `fromTableResult`?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -386,30 +414,51 @@ private ResultFetcher callModifyOperations(
             OperationHandle handle,
             List<ModifyOperation> modifyOperations) {
         TableResultInternal result = tableEnv.executeInternal(modifyOperations);
-        return new ResultFetcher(
-                handle,
-                ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())),
-                Collections.singletonList(
-                        GenericRowData.of(
-                                StringData.fromString(
-                                        result.getJobClient()
-                                                .orElseThrow(
-                                                        () ->
-                                                                new SqlExecutionException(
-                                                                        String.format(
-                                                                                "Can't get job client for the operation %s.",
-                                                                                handle)))
-                                                .getJobID()
-                                                .toString()))));
+        return ResultFetcher.newBuilder()
+                .operationHandle(handle)
+                .resolvedSchema(ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())))
+                .rows(
+                        Collections.singletonList(
+                                GenericRowData.of(
+                                        StringData.fromString(
+                                                result.getJobClient()
+                                                        .orElseThrow(
+                                                                () ->
+                                                                        new SqlExecutionException(
+                                                                                String.format(
+                                                                                        "Can't get job client for the operation %s.",
+                                                                                        handle)))
+                                                        .getJobID()
+                                                        .toString()))))
+                .converter(SIMPLE_ROW_DATA_TO_STRING_CONVERTER)

Review Comment:
   I think `SIMPLE_ROW_DATA_TO_STRING_CONVERTER` can be the default converter.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -220,4 +223,111 @@ public synchronized ResultSet fetchResults(long token, int maxFetchSize) {
     public ResultStore getResultStore() {
         return resultStore;
     }
+
+    private ResultSet buildEosResultSet() {
+        return ResultSetImpl.newBuilder()
+                .resultType(ResultSet.ResultType.EOS)
+                .nextToken(null)
+                .resolvedSchema(resultSchema)
+                .data(Collections.emptyList())
+                .build();
+    }
+
+    private ResultSet buildPayloadResultSet() {
+        return ResultSetImpl.newBuilder()
+                .resultType(ResultSet.ResultType.PAYLOAD)
+                .nextToken(currentToken)
+                .resolvedSchema(resultSchema)
+                .data(new ArrayList<>(bufferedPrevResults))
+                .converter(converter)
+                .isQueryResult(isQueryResult)
+                .jobID(jobID)
+                .resultKind(resultKind)
+                .build();
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /** Builder to build the {@link ResultFetcher}. */
+    public static class Builder {
+        private OperationHandle operationHandle;
+        private ResolvedSchema resultSchema;
+        private List<RowData> rows;
+        private CloseableIterator<RowData> rowsIterator;
+        RowDataToStringConverter converter;

Review Comment:
   Why this is not private?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -220,4 +223,111 @@ public synchronized ResultSet fetchResults(long token, int maxFetchSize) {
     public ResultStore getResultStore() {
         return resultStore;
     }
+
+    private ResultSet buildEosResultSet() {
+        return ResultSetImpl.newBuilder()
+                .resultType(ResultSet.ResultType.EOS)
+                .nextToken(null)
+                .resolvedSchema(resultSchema)
+                .data(Collections.emptyList())
+                .build();
+    }
+
+    private ResultSet buildPayloadResultSet() {
+        return ResultSetImpl.newBuilder()
+                .resultType(ResultSet.ResultType.PAYLOAD)
+                .nextToken(currentToken)
+                .resolvedSchema(resultSchema)
+                .data(new ArrayList<>(bufferedPrevResults))
+                .converter(converter)
+                .isQueryResult(isQueryResult)
+                .jobID(jobID)
+                .resultKind(resultKind)
+                .build();
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /** Builder to build the {@link ResultFetcher}. */
+    public static class Builder {
+        private OperationHandle operationHandle;
+        private ResolvedSchema resultSchema;
+        private List<RowData> rows;
+        private CloseableIterator<RowData> rowsIterator;
+        RowDataToStringConverter converter;
+        private boolean isQueryResult = false;
+        @Nullable private JobID jobID;
+        private ResultKind resultKind;
+
+        public Builder operationHandle(OperationHandle operationHandle) {
+            this.operationHandle = operationHandle;
+            return this;
+        }
+
+        public Builder resolvedSchema(ResolvedSchema resultSchema) {
+            this.resultSchema = resultSchema;
+            return this;
+        }
+
+        public Builder rows(List<RowData> rows) {
+            Preconditions.checkState(
+                    rowsIterator == null,
+                    "Result data has been set already. Can only set either rows or rowsIterator");
+            this.rows = rows;
+            return this;
+        }
+
+        public Builder rowsIterator(CloseableIterator<RowData> rowsIterator) {
+            Preconditions.checkState(
+                    rows == null,
+                    "Result data has been set already. Can only set either rows or rowsIterator");
+            this.rowsIterator = rowsIterator;
+            return this;
+        }
+
+        public Builder converter(RowDataToStringConverter converter) {
+            this.converter = converter;
+            return this;
+        }
+
+        public Builder setIsQueryResult() {
+            this.isQueryResult = true;
+            return this;
+        }
+
+        public Builder jobID(JobID jobID) {
+            this.jobID = jobID;
+            return this;
+        }
+
+        public Builder resultKind(ResultKind resultKind) {
+            this.resultKind = resultKind;
+            return this;
+        }
+
+        public ResultFetcher build() {
+            Preconditions.checkArgument(
+                    rows != null || rowsIterator != null, "Result data has not been set.");
+
+            ResultFetcher resultFetcher =
+                    new ResultFetcher(
+                            operationHandle,
+                            resultSchema,
+                            converter,
+                            isQueryResult,
+                            jobID,
+                            resultKind);
+
+            if (rows != null) {
+                resultFetcher.resultStore = ResultStore.DUMMY_RESULT_STORE;
+                resultFetcher.bufferedResults.addAll(rows);
+            } else {
+                resultFetcher.resultStore =
+                        new ResultStore(rowsIterator, TABLE_RESULT_MAX_INITIAL_CAPACITY);
+            }

Review Comment:
   Why not introducing 
   ```
       public static ResultFetcher fromTableResult(
               OperationHandle operationHandle, TableResult tableResult, boolean isQuery) {}
   
       public static ResultFetcher fromResults(
               OperationHandle operationHandle,
               ResolvedSchema resolvedSchema,
               List<RowData> results) {}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on pull request #21502: [FLINK-29950] Refactor ResultSet to an interface

Posted by GitBox <gi...@apache.org>.
yuzelin commented on PR #21502:
URL: https://github.com/apache/flink/pull/21502#issuecomment-1379830165

   > @yuzelin I add a commit to improve some codes. Could you take a look?
   
   LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 merged pull request #21502: [FLINK-29950] Refactor ResultSet to an interface

Posted by GitBox <gi...@apache.org>.
fsk119 merged PR #21502:
URL: https://github.com/apache/flink/pull/21502


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21502: [WIP] ResultSet Modification.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21502:
URL: https://github.com/apache/flink/pull/21502#issuecomment-1348153671

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "19cb7f57ebcbfb579de9689cbad44d0bd3564e27",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "19cb7f57ebcbfb579de9689cbad44d0bd3564e27",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 19cb7f57ebcbfb579de9689cbad44d0bd3564e27 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on a diff in pull request #21502: [FLINK-29950] Refactor ResultSet to an interface

Posted by GitBox <gi...@apache.org>.
yuzelin commented on code in PR #21502:
URL: https://github.com/apache/flink/pull/21502#discussion_r1066716687


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -220,4 +283,26 @@ public synchronized ResultSet fetchResults(long token, int maxFetchSize) {
     public ResultStore getResultStore() {
         return resultStore;
     }
+
+    private ResultSet buildEOSResultSet() {
+        return ResultSetImpl.newBuilder()
+                .resultType(ResultSet.ResultType.EOS)
+                .nextToken(null)
+                .resolvedSchema(resultSchema)
+                .data(Collections.emptyList())
+                .build();
+    }
+
+    private ResultSet buildPayloadResultSet() {

Review Comment:
   Maybe Builder is not necessary. Using constructor is OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #21502: [FLINK-29950] Refactor ResultSet to an interface

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #21502:
URL: https://github.com/apache/flink/pull/21502#issuecomment-1379827990

   @yuzelin I add a commit to improve some codes. Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on pull request #21502: [FLINK-29950] Refactor ResultSet to an interface

Posted by GitBox <gi...@apache.org>.
yuzelin commented on PR #21502:
URL: https://github.com/apache/flink/pull/21502#issuecomment-1379883129

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] yuzelin commented on pull request #21502: [WIP] ResultSet Modification.

Posted by GitBox <gi...@apache.org>.
yuzelin commented on PR #21502:
URL: https://github.com/apache/flink/pull/21502#issuecomment-1350244643

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org