You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/19 09:12:19 UTC

[kyuubi] branch master updated: [KYUUBI #4517] [FLINK] Fix multiple executions lead to abnormal results on Flink 1.14

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c03664b8d [KYUUBI #4517] [FLINK] Fix multiple executions lead to abnormal results on Flink 1.14
c03664b8d is described below

commit c03664b8d8169f0d8b8b369e0bf96a5291e8361c
Author: Chao Chen <ch...@grgbanking.com>
AuthorDate: Sun Mar 19 17:12:07 2023 +0800

    [KYUUBI #4517] [FLINK] Fix multiple executions lead to abnormal results on Flink 1.14
    
    Fix bug in flink 1.14 version, multiple executions lead to abnormal results
    
    ### _Why are the changes needed?_
    
    Fix bug in flink 1.14 version, multiple executions lead to abnormal results
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [X] Add screenshots for manual tests if appropriate
    
    - [X] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4517 from waywtdcc/flink1.14_result_ok_error.
    
    Closes #4517
    
    96ce6129c [Cheng Pan] ut
    1bd9d1e2f [Cheng Pan] nit
    5e5bccc91 [Cheng Pan] Migrate Flink engine Java code to Scala
    4afb02064 [chenchao4] Fix bug in flink 1.14 version, multiple executions lead to abnormal results
    3d5dc64c5 [chenchao4] Fix bug in flink 1.14 version, multiple executions lead to abnormal results
    c084864bd [chenchao4] Fix bug in flink 1.14 version, multiple executions lead to abnormal results
    954d76062 [chenchao4] Fix bug in flink 1.14 version, multiple executions lead to abnormal results
    d63ec55f2 [chenchao4] Fix bug in flink 1.14 version, multiple executions lead to abnormal results
    
    Lead-authored-by: Chao Chen <ch...@grgbanking.com>
    Co-authored-by: chenchao4 <Chenchao123>
    Co-authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../kyuubi/engine/flink/result/Constants.java      |  28 ----
 .../kyuubi/engine/flink/result/ResultSet.java      | 178 ---------------------
 .../kyuubi/engine/flink/result/Constants.scala     |  24 +++
 .../kyuubi/engine/flink/result/ResultSet.scala     | 139 ++++++++++++++++
 .../flink/operation/FlinkOperationSuite.scala      |  19 +--
 5 files changed, 170 insertions(+), 218 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
deleted file mode 100644
index b683eb76a..000000000
--- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.flink.result;
-
-/** Constant column names. */
-public class Constants {
-
-  public static final String TABLE_TYPE = "TABLE";
-  public static final String VIEW_TYPE = "VIEW";
-
-  public static final String[] SUPPORTED_TABLE_TYPES = new String[] {TABLE_TYPE, VIEW_TYPE};
-}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java
deleted file mode 100644
index 66f03a159..000000000
--- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.flink.result;
-
-import com.google.common.collect.Iterators;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-import javax.annotation.Nullable;
-import org.apache.flink.table.api.ResultKind;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-import org.apache.kyuubi.operation.ArrayFetchIterator;
-import org.apache.kyuubi.operation.FetchIterator;
-
-/**
- * A set of one statement execution result containing result kind, columns, rows of data and change
- * flags for streaming mode.
- */
-public class ResultSet {
-
-  private final ResultKind resultKind;
-  private final List<Column> columns;
-  private final FetchIterator<Row> data;
-
-  // null in batch mode
-  //
-  // list of boolean in streaming mode,
-  // true if the corresponding row is an append row, false if its a retract row
-  private final List<Boolean> changeFlags;
-
-  private ResultSet(
-      ResultKind resultKind,
-      List<Column> columns,
-      FetchIterator<Row> data,
-      @Nullable List<Boolean> changeFlags) {
-    this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind must not be null");
-    this.columns = Preconditions.checkNotNull(columns, "columns must not be null");
-    this.data = Preconditions.checkNotNull(data, "data must not be null");
-    this.changeFlags = changeFlags;
-    if (changeFlags != null) {
-      Preconditions.checkArgument(
-          Iterators.size((Iterator<?>) data) == changeFlags.size(),
-          "the size of data and the size of changeFlags should be equal");
-    }
-  }
-
-  public List<Column> getColumns() {
-    return columns;
-  }
-
-  public FetchIterator<Row> getData() {
-    return data;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    ResultSet resultSet = (ResultSet) o;
-    return resultKind.equals(resultSet.resultKind)
-        && columns.equals(resultSet.columns)
-        && data.equals(resultSet.data)
-        && Objects.equals(changeFlags, resultSet.changeFlags);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(resultKind, columns, data, changeFlags);
-  }
-
-  @Override
-  public String toString() {
-    return "ResultSet{"
-        + "resultKind="
-        + resultKind
-        + ", columns="
-        + columns
-        + ", data="
-        + data
-        + ", changeFlags="
-        + changeFlags
-        + '}';
-  }
-
-  public static ResultSet fromTableResult(TableResult tableResult) {
-    ResolvedSchema schema = tableResult.getResolvedSchema();
-    // collect all rows from table result as list
-    // this is ok as TableResult contains limited rows
-    List<Row> rows = new ArrayList<>();
-    tableResult.collect().forEachRemaining(rows::add);
-    return builder()
-        .resultKind(tableResult.getResultKind())
-        .columns(schema.getColumns())
-        .data(rows.toArray(new Row[0]))
-        .build();
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  /** Builder for {@link ResultSet}. */
-  public static class Builder {
-    private ResultKind resultKind = null;
-    private List<Column> columns = null;
-    private FetchIterator<Row> data = null;
-    private List<Boolean> changeFlags = null;
-
-    private Builder() {}
-
-    /** Set {@link ResultKind}. */
-    public Builder resultKind(ResultKind resultKind) {
-      this.resultKind = resultKind;
-      return this;
-    }
-
-    /** Set columns. */
-    public Builder columns(Column... columns) {
-      this.columns = Arrays.asList(columns);
-      return this;
-    }
-
-    /** Set columns. */
-    public Builder columns(List<Column> columns) {
-      this.columns = columns;
-      return this;
-    }
-
-    /** Set data. */
-    public Builder data(FetchIterator<Row> data) {
-      this.data = data;
-      return this;
-    }
-
-    /** Set data. */
-    public Builder data(Row[] data) {
-      this.data = new ArrayFetchIterator<>(data);
-      return this;
-    }
-
-    /** Set change flags. */
-    public Builder changeFlags(List<Boolean> changeFlags) {
-      this.changeFlags = changeFlags;
-      return this;
-    }
-
-    /** Returns a {@link ResultSet} instance. */
-    public ResultSet build() {
-      return new ResultSet(resultKind, columns, data, changeFlags);
-    }
-  }
-}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/Constants.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/Constants.scala
new file mode 100644
index 000000000..ca582b2e3
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/Constants.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result
+
+object Constants {
+  val TABLE_TYPE: String = "TABLE"
+  val VIEW_TYPE: String = "VIEW"
+  val SUPPORTED_TABLE_TYPES: Array[String] = Array[String](TABLE_TYPE, VIEW_TYPE)
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
new file mode 100644
index 000000000..323d157b1
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result
+
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import com.google.common.collect.Iterators
+import org.apache.flink.table.api.{DataTypes, ResultKind, TableResult}
+import org.apache.flink.table.api.internal.TableResultImpl
+import org.apache.flink.table.catalog.{Column, ResolvedSchema}
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
+import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator}
+import org.apache.kyuubi.reflection.{DynFields, DynMethods}
+
+case class ResultSet(
+    resultKind: ResultKind,
+    columns: util.List[Column],
+    data: FetchIterator[Row],
+    // null in batch mode
+    // list of boolean in streaming mode,
+    // true if the corresponding row is an append row, false if its a retract row
+    changeFlags: Option[util.List[Boolean]]) {
+
+  require(resultKind != null, "resultKind must not be null")
+  require(columns != null, "columns must not be null")
+  require(data != null, "data must not be null")
+  changeFlags.foreach { flags =>
+    require(
+      Iterators.size(data.asInstanceOf[util.Iterator[_]]) == flags.size,
+      "the size of data and the size of changeFlags should be equal")
+  }
+
+  def getColumns: util.List[Column] = columns
+
+  def getData: FetchIterator[Row] = data
+}
+
+/**
+ * A set of one statement execution result containing result kind, columns, rows of data and change
+ * flags for streaming mode.
+ */
+object ResultSet {
+
+  private lazy val TABLE_RESULT_OK = DynFields.builder()
+    .hiddenImpl(classOf[TableResultImpl], "TABLE_RESULT_OK") // for Flink 1.14
+    .buildStatic[TableResult]
+    .get
+
+  def fromTableResult(tableResult: TableResult): ResultSet = {
+    // FLINK-25558, if execute multiple SQLs that return OK, the second and latter results
+    // would be empty, which affects Flink 1.14
+    val fixedTableResult: TableResult =
+      if (isFlinkVersionAtMost("1.14") && tableResult == TABLE_RESULT_OK) {
+        // FLINK-24461 executeOperation method changes the return type
+        // from TableResult to TableResultInternal
+        val builder = TableResultImpl.builder
+          .resultKind(ResultKind.SUCCESS)
+          .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING)))
+          .data(Collections.singletonList(Row.of("OK")))
+        // FLINK-24461 the return type of TableResultImpl.Builder#build changed
+        // from TableResult to TableResultInternal
+        DynMethods.builder("build")
+          .impl(classOf[TableResultImpl.Builder])
+          .build(builder)
+          .invoke[TableResult]()
+      } else {
+        tableResult
+      }
+    val schema = fixedTableResult.getResolvedSchema
+    // collect all rows from table result as list
+    // this is ok as TableResult contains limited rows
+    val rows = fixedTableResult.collect.asScala.toArray
+    builder.resultKind(fixedTableResult.getResultKind)
+      .columns(schema.getColumns)
+      .data(rows)
+      .build
+  }
+
+  def builder: Builder = new ResultSet.Builder
+
+  class Builder {
+    private var resultKind: ResultKind = _
+    private var columns: util.List[Column] = _
+    private var data: FetchIterator[Row] = _
+    private var changeFlags: Option[util.List[Boolean]] = None
+
+    def resultKind(resultKind: ResultKind): ResultSet.Builder = {
+      this.resultKind = resultKind
+      this
+    }
+
+    def columns(columns: Column*): ResultSet.Builder = {
+      this.columns = columns.asJava
+      this
+    }
+
+    def columns(columns: util.List[Column]): ResultSet.Builder = {
+      this.columns = columns
+      this
+    }
+
+    def data(data: FetchIterator[Row]): ResultSet.Builder = {
+      this.data = data
+      this
+    }
+
+    def data(data: Array[Row]): ResultSet.Builder = {
+      this.data = new ArrayFetchIterator[Row](data)
+      this
+    }
+
+    def changeFlags(changeFlags: util.List[Boolean]): ResultSet.Builder = {
+      this.changeFlags = Some(changeFlags)
+      this
+    }
+
+    def build: ResultSet = new ResultSet(resultKind, columns, data, changeFlags)
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index d0522d3ea..70eb84ddb 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -876,20 +876,15 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
   }
 
   test("execute statement - create/drop catalog") {
-    withJdbcStatement()({ statement =>
-      val createResult = {
+    withJdbcStatement() { statement =>
+      val createResult =
         statement.executeQuery("create catalog cat_a with ('type'='generic_in_memory')")
-      }
-      if (isFlinkVersionAtLeast("1.15")) {
-        assert(createResult.next())
-        assert(createResult.getString(1) === "OK")
-      }
+      assert(createResult.next())
+      assert(createResult.getString(1) === "OK")
       val dropResult = statement.executeQuery("drop catalog cat_a")
-      if (isFlinkVersionAtLeast("1.15")) {
-        assert(dropResult.next())
-        assert(dropResult.getString(1) === "OK")
-      }
-    })
+      assert(dropResult.next())
+      assert(dropResult.getString(1) === "OK")
+    }
   }
 
   test("execute statement - set/get catalog") {