You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/19 09:12:33 UTC
[kyuubi] branch branch-1.7 updated: [KYUUBI #4517] [FLINK] Fix multiple executions lead to abnormal results on Flink 1.14
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 4e946921b [KYUUBI #4517] [FLINK] Fix multiple executions lead to abnormal results on Flink 1.14
4e946921b is described below
commit 4e946921b711fab076a7288a39d3cf8d46a924e3
Author: Chao Chen <ch...@grgbanking.com>
AuthorDate: Sun Mar 19 17:12:07 2023 +0800
[KYUUBI #4517] [FLINK] Fix multiple executions lead to abnormal results on Flink 1.14
Fix bug in flink 1.14 version, multiple executions lead to abnormal results
### _Why are the changes needed?_
Fix bug in flink 1.14 version, multiple executions lead to abnormal results
### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [X] Add screenshots for manual tests if appropriate
- [X] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4517 from waywtdcc/flink1.14_result_ok_error.
Closes #4517
96ce6129c [Cheng Pan] ut
1bd9d1e2f [Cheng Pan] nit
5e5bccc91 [Cheng Pan] Migrate Flink engine Java code to Scala
4afb02064 [chenchao4] Fix bug in flink 1.14 version, multiple executions lead to abnormal results
3d5dc64c5 [chenchao4] Fix bug in flink 1.14 version, multiple executions lead to abnormal results
c084864bd [chenchao4] Fix bug in flink 1.14 version, multiple executions lead to abnormal results
954d76062 [chenchao4] Fix bug in flink 1.14 version, multiple executions lead to abnormal results
d63ec55f2 [chenchao4] Fix bug in flink 1.14 version, multiple executions lead to abnormal results
Lead-authored-by: Chao Chen <ch...@grgbanking.com>
Co-authored-by: chenchao4 <Chenchao123>
Co-authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
(cherry picked from commit c03664b8d8169f0d8b8b369e0bf96a5291e8361c)
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../kyuubi/engine/flink/result/Constants.java | 28 ----
.../kyuubi/engine/flink/result/ResultSet.java | 178 ---------------------
.../kyuubi/engine/flink/result/Constants.scala | 24 +++
.../kyuubi/engine/flink/result/ResultSet.scala | 139 ++++++++++++++++
.../flink/operation/FlinkOperationSuite.scala | 19 +--
5 files changed, 170 insertions(+), 218 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
deleted file mode 100644
index b683eb76a..000000000
--- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.flink.result;
-
-/** Constant column names. */
-public class Constants {
-
- public static final String TABLE_TYPE = "TABLE";
- public static final String VIEW_TYPE = "VIEW";
-
- public static final String[] SUPPORTED_TABLE_TYPES = new String[] {TABLE_TYPE, VIEW_TYPE};
-}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java
deleted file mode 100644
index 66f03a159..000000000
--- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.flink.result;
-
-import com.google.common.collect.Iterators;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.flink.table.api.ResultKind;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-import org.apache.kyuubi.operation.ArrayFetchIterator;
-import org.apache.kyuubi.operation.FetchIterator;
-
-/**
- * A set of one statement execution result containing result kind, columns, rows of data and change
- * flags for streaming mode.
- */
-public class ResultSet {
-
- private final ResultKind resultKind;
- private final List<Column> columns;
- private final FetchIterator<Row> data;
-
- // null in batch mode
- //
- // list of boolean in streaming mode,
- // true if the corresponding row is an append row, false if its a retract row
- private final List<Boolean> changeFlags;
-
- private ResultSet(
- ResultKind resultKind,
- List<Column> columns,
- FetchIterator<Row> data,
- @Nullable List<Boolean> changeFlags) {
- this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind must not be null");
- this.columns = Preconditions.checkNotNull(columns, "columns must not be null");
- this.data = Preconditions.checkNotNull(data, "data must not be null");
- this.changeFlags = changeFlags;
- if (changeFlags != null) {
- Preconditions.checkArgument(
- Iterators.size((Iterator<?>) data) == changeFlags.size(),
- "the size of data and the size of changeFlags should be equal");
- }
- }
-
- public List<Column> getColumns() {
- return columns;
- }
-
- public FetchIterator<Row> getData() {
- return data;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ResultSet resultSet = (ResultSet) o;
- return resultKind.equals(resultSet.resultKind)
- && columns.equals(resultSet.columns)
- && data.equals(resultSet.data)
- && Objects.equals(changeFlags, resultSet.changeFlags);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(resultKind, columns, data, changeFlags);
- }
-
- @Override
- public String toString() {
- return "ResultSet{"
- + "resultKind="
- + resultKind
- + ", columns="
- + columns
- + ", data="
- + data
- + ", changeFlags="
- + changeFlags
- + '}';
- }
-
- public static ResultSet fromTableResult(TableResult tableResult) {
- ResolvedSchema schema = tableResult.getResolvedSchema();
- // collect all rows from table result as list
- // this is ok as TableResult contains limited rows
- List<Row> rows = new ArrayList<>();
- tableResult.collect().forEachRemaining(rows::add);
- return builder()
- .resultKind(tableResult.getResultKind())
- .columns(schema.getColumns())
- .data(rows.toArray(new Row[0]))
- .build();
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- /** Builder for {@link ResultSet}. */
- public static class Builder {
- private ResultKind resultKind = null;
- private List<Column> columns = null;
- private FetchIterator<Row> data = null;
- private List<Boolean> changeFlags = null;
-
- private Builder() {}
-
- /** Set {@link ResultKind}. */
- public Builder resultKind(ResultKind resultKind) {
- this.resultKind = resultKind;
- return this;
- }
-
- /** Set columns. */
- public Builder columns(Column... columns) {
- this.columns = Arrays.asList(columns);
- return this;
- }
-
- /** Set columns. */
- public Builder columns(List<Column> columns) {
- this.columns = columns;
- return this;
- }
-
- /** Set data. */
- public Builder data(FetchIterator<Row> data) {
- this.data = data;
- return this;
- }
-
- /** Set data. */
- public Builder data(Row[] data) {
- this.data = new ArrayFetchIterator<>(data);
- return this;
- }
-
- /** Set change flags. */
- public Builder changeFlags(List<Boolean> changeFlags) {
- this.changeFlags = changeFlags;
- return this;
- }
-
- /** Returns a {@link ResultSet} instance. */
- public ResultSet build() {
- return new ResultSet(resultKind, columns, data, changeFlags);
- }
- }
-}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/Constants.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/Constants.scala
new file mode 100644
index 000000000..ca582b2e3
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/Constants.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result
+
+object Constants {
+ val TABLE_TYPE: String = "TABLE"
+ val VIEW_TYPE: String = "VIEW"
+ val SUPPORTED_TABLE_TYPES: Array[String] = Array[String](TABLE_TYPE, VIEW_TYPE)
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
new file mode 100644
index 000000000..323d157b1
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result
+
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.Iterators
+import org.apache.flink.table.api.{DataTypes, ResultKind, TableResult}
+import org.apache.flink.table.api.internal.TableResultImpl
+import org.apache.flink.table.catalog.{Column, ResolvedSchema}
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
+import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator}
+import org.apache.kyuubi.reflection.{DynFields, DynMethods}
+
+case class ResultSet(
+ resultKind: ResultKind,
+ columns: util.List[Column],
+ data: FetchIterator[Row],
+ // null in batch mode
+ // list of boolean in streaming mode,
+ // true if the corresponding row is an append row, false if its a retract row
+ changeFlags: Option[util.List[Boolean]]) {
+
+ require(resultKind != null, "resultKind must not be null")
+ require(columns != null, "columns must not be null")
+ require(data != null, "data must not be null")
+ changeFlags.foreach { flags =>
+ require(
+ Iterators.size(data.asInstanceOf[util.Iterator[_]]) == flags.size,
+ "the size of data and the size of changeFlags should be equal")
+ }
+
+ def getColumns: util.List[Column] = columns
+
+ def getData: FetchIterator[Row] = data
+}
+
+/**
+ * A set of one statement execution result containing result kind, columns, rows of data and change
+ * flags for streaming mode.
+ */
+object ResultSet {
+
+ private lazy val TABLE_RESULT_OK = DynFields.builder()
+ .hiddenImpl(classOf[TableResultImpl], "TABLE_RESULT_OK") // for Flink 1.14
+ .buildStatic[TableResult]
+ .get
+
+ def fromTableResult(tableResult: TableResult): ResultSet = {
+ // FLINK-25558, if execute multiple SQLs that return OK, the second and latter results
+ // would be empty, which affects Flink 1.14
+ val fixedTableResult: TableResult =
+ if (isFlinkVersionAtMost("1.14") && tableResult == TABLE_RESULT_OK) {
+ // FLINK-24461 executeOperation method changes the return type
+ // from TableResult to TableResultInternal
+ val builder = TableResultImpl.builder
+ .resultKind(ResultKind.SUCCESS)
+ .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING)))
+ .data(Collections.singletonList(Row.of("OK")))
+ // FLINK-24461 the return type of TableResultImpl.Builder#build changed
+ // from TableResult to TableResultInternal
+ DynMethods.builder("build")
+ .impl(classOf[TableResultImpl.Builder])
+ .build(builder)
+ .invoke[TableResult]()
+ } else {
+ tableResult
+ }
+ val schema = fixedTableResult.getResolvedSchema
+ // collect all rows from table result as list
+ // this is ok as TableResult contains limited rows
+ val rows = fixedTableResult.collect.asScala.toArray
+ builder.resultKind(fixedTableResult.getResultKind)
+ .columns(schema.getColumns)
+ .data(rows)
+ .build
+ }
+
+ def builder: Builder = new ResultSet.Builder
+
+ class Builder {
+ private var resultKind: ResultKind = _
+ private var columns: util.List[Column] = _
+ private var data: FetchIterator[Row] = _
+ private var changeFlags: Option[util.List[Boolean]] = None
+
+ def resultKind(resultKind: ResultKind): ResultSet.Builder = {
+ this.resultKind = resultKind
+ this
+ }
+
+ def columns(columns: Column*): ResultSet.Builder = {
+ this.columns = columns.asJava
+ this
+ }
+
+ def columns(columns: util.List[Column]): ResultSet.Builder = {
+ this.columns = columns
+ this
+ }
+
+ def data(data: FetchIterator[Row]): ResultSet.Builder = {
+ this.data = data
+ this
+ }
+
+ def data(data: Array[Row]): ResultSet.Builder = {
+ this.data = new ArrayFetchIterator[Row](data)
+ this
+ }
+
+ def changeFlags(changeFlags: util.List[Boolean]): ResultSet.Builder = {
+ this.changeFlags = Some(changeFlags)
+ this
+ }
+
+ def build: ResultSet = new ResultSet(resultKind, columns, data, changeFlags)
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index d0522d3ea..70eb84ddb 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -876,20 +876,15 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
test("execute statement - create/drop catalog") {
- withJdbcStatement()({ statement =>
- val createResult = {
+ withJdbcStatement() { statement =>
+ val createResult =
statement.executeQuery("create catalog cat_a with ('type'='generic_in_memory')")
- }
- if (isFlinkVersionAtLeast("1.15")) {
- assert(createResult.next())
- assert(createResult.getString(1) === "OK")
- }
+ assert(createResult.next())
+ assert(createResult.getString(1) === "OK")
val dropResult = statement.executeQuery("drop catalog cat_a")
- if (isFlinkVersionAtLeast("1.15")) {
- assert(dropResult.next())
- assert(dropResult.getString(1) === "OK")
- }
- })
+ assert(dropResult.next())
+ assert(dropResult.getString(1) === "OK")
+ }
}
test("execute statement - set/get catalog") {