You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/08/04 08:41:24 UTC

[flink] branch master updated: [FLINK-28212][hive-dialect] Fix IndexOutOfBoundsException when over window SELECT doesn't contain all fields of input

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 42287b399f7 [FLINK-28212][hive-dialect] Fix IndexOutOfBoundsException when over window SELECT doesn't contain all fields of input
42287b399f7 is described below

commit 42287b399f7ca74a419cee0385a84e7859fc7628
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Thu Aug 4 16:41:16 2022 +0800

    [FLINK-28212][hive-dialect] Fix IndexOutOfBoundsException when over window SELECT doesn't contain all fields of input
    
    This closes #20060
---
 .../delegation/hive/HiveParserCalcitePlanner.java  |  10 +
 .../hive/HiveParserProjectWindowTrimmer.java       | 226 +++++++++++++++++++++
 .../connectors/hive/HiveDialectQueryITCase.java    |  27 +++
 3 files changed, 263 insertions(+)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
index f813595de00..722da244fda 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
@@ -2423,6 +2423,16 @@ public class HiveParserCalcitePlanner {
         }
 
         inputRR.setCheckForAmbiguity(false);
+        if (selForWindow != null && res instanceof Project) {
+            // if exist windowing expression, trim the project node with window
+            res =
+                    HiveParserProjectWindowTrimmer.trimProjectWindow(
+                            (Project) res,
+                            (Project) selForWindow,
+                            relToRowResolver,
+                            relToHiveColNameCalcitePosMap);
+        }
+
         return res;
     }
 
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserProjectWindowTrimmer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserProjectWindowTrimmer.java
new file mode 100644
index 00000000000..ba03a88c1f3
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserProjectWindowTrimmer.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive;
+
+import org.apache.flink.table.planner.delegation.hive.copy.HiveParserRowResolver;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexWindow;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.buildHiveToCalciteColumnMap;
+
+/**
+ * For the sql with window expression, it will be converted two project nodes by HiveParser. One of
+ * is a project node containing all input fields and the corresponding window node, and another is a
+ * project node only contains the selected fields.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * create table src(a int, b int, c int);
+ * select a, count(b) over(order by a rows between 1 preceding and 1 following) from src;
+ * }</pre>
+ *
+ * <p>will be converted into the RelNode like:
+ *
+ * <pre>{@code
+ * LogicalProject(a=[$0], _o_c2=[$4])
+ *  LogicalProject(a=[$0], b=[$1], c=[$2],
+ *  _o_col3=[count($1) over(order by $0 desc nulls last rows between 1 preceding and 1 following)])
+ *
+ * }</pre>
+ *
+ * <p>The project node with window will contain all the fields, some of which are not necessary. And
+ * it will remove the redundant nodes in rule {@link
+ * org.apache.calcite.rel.rules.ProjectWindowTransposeRule}, and adjust the index referred in {@link
+ * RexInputRef} for it remove some nodes. But it hasn't adjusted the index of lowerBound/upperBound,
+ * which then cause problem when try to access the lowerBound/upperBound.
+ *
+ * <p>The class's behavior is quite same to {@link
+ * org.apache.calcite.rel.rules.ProjectWindowTransposeRule}, but also adjusts the index of
+ * lowerBound/upperBound.
+ */
+public class HiveParserProjectWindowTrimmer {
+
+    /**
+     * Remove the redundant nodes from the project node which contains over window node.
+     *
+     * @param selectProject the project node contains selected fields in top of the project node
+     *     with window
+     * @param projectWithWindow the project node which contains windows in the end of project
+     *     expressions.
+     * @return the new project node after trimming
+     */
+    public static RelNode trimProjectWindow(
+            Project selectProject,
+            Project projectWithWindow,
+            Map<RelNode, HiveParserRowResolver> relToRowResolver,
+            Map<RelNode, Map<String, Integer>> relToHiveColNameCalcitePosMap) {
+        // get the over window nodes
+        List<RexOver> rexOverList =
+                projectWithWindow.getProjects().stream()
+                        .filter(node -> node instanceof RexOver)
+                        .map(node -> (RexOver) node)
+                        .collect(Collectors.toList());
+        // the fields size excluding the over window field in the project node with window
+        int windowInputColumn = projectWithWindow.getProjects().size() - rexOverList.size();
+        // find all field referred by over window and select project node
+        final ImmutableBitSet beReferred =
+                findReference(selectProject, rexOverList, windowInputColumn);
+
+        // If all the input columns are referred,
+        // it is impossible to trim anyone of them out
+        if (beReferred.cardinality() == windowInputColumn) {
+            return selectProject;
+        }
+
+        // Keep only the fields which are referred and the over window field
+        final List<RexNode> exps = new ArrayList<>();
+        final RelDataTypeFactory.Builder builder =
+                projectWithWindow.getCluster().getTypeFactory().builder();
+
+        final List<RelDataTypeField> rowTypeWindowInput =
+                projectWithWindow.getRowType().getFieldList();
+        // add index for referred field
+        List<Integer> remainIndexInProjectWindow = new ArrayList<>(beReferred.asList());
+        // add index for the over window field
+        remainIndexInProjectWindow.addAll(
+                IntStream.range(windowInputColumn, projectWithWindow.getProjects().size())
+                        .boxed()
+                        .collect(Collectors.toList()));
+        for (int index : remainIndexInProjectWindow) {
+            exps.add(projectWithWindow.getProjects().get(index));
+            builder.add(rowTypeWindowInput.get(index));
+        }
+
+        // As the un-referred columns are trimmed,
+        // the indices specified in select project would need to be adjusted
+        final RexShuttle indexAdjustment =
+                new RexShuttle() {
+                    @Override
+                    public RexNode visitInputRef(RexInputRef inputRef) {
+                        final int newIndex =
+                                getAdjustedIndex(
+                                        inputRef.getIndex(), beReferred, windowInputColumn);
+                        return new RexInputRef(newIndex, inputRef.getType());
+                    }
+                };
+
+        // adjust the top select project node
+        final List<RexNode> topProjExps = indexAdjustment.visitList(selectProject.getProjects());
+
+        // create a project with the project trimmed
+        LogicalProject trimmedProject =
+                LogicalProject.create(
+                        projectWithWindow.getInput(),
+                        Collections.emptyList(),
+                        exps,
+                        builder.build());
+
+        // put row resolver for newly trimmed project node
+        HiveParserRowResolver oldRowResolver = relToRowResolver.remove(projectWithWindow);
+        if (oldRowResolver != null) {
+            HiveParserRowResolver newProjectRR = new HiveParserRowResolver();
+            List<ColumnInfo> oldColumnsInfo = oldRowResolver.getColumnInfos();
+            for (int index : remainIndexInProjectWindow) {
+                newProjectRR.put(
+                        oldColumnsInfo.get(index).getTabAlias(),
+                        oldColumnsInfo.get(index).getAlias(),
+                        oldColumnsInfo.get(index));
+            }
+            relToRowResolver.put(trimmedProject, newProjectRR);
+            relToHiveColNameCalcitePosMap.remove(projectWithWindow);
+            relToHiveColNameCalcitePosMap.put(
+                    trimmedProject, buildHiveToCalciteColumnMap(newProjectRR));
+        }
+
+        // create new project with adjusted field ref
+        RelNode newProject =
+                LogicalProject.create(
+                        trimmedProject,
+                        Collections.emptyList(),
+                        topProjExps,
+                        selectProject.getRowType());
+        // put row resolver for newly project node
+        relToRowResolver.put(newProject, relToRowResolver.remove(selectProject));
+        relToHiveColNameCalcitePosMap.put(
+                newProject, relToHiveColNameCalcitePosMap.remove(selectProject));
+        return newProject;
+    }
+
+    private static ImmutableBitSet findReference(
+            final Project project, List<RexOver> rexOverList, int windowInputColumn) {
+        final ImmutableBitSet.Builder beReferred = ImmutableBitSet.builder();
+
+        final RexShuttle referenceFinder =
+                new RexShuttle() {
+                    @Override
+                    public RexNode visitInputRef(RexInputRef inputRef) {
+                        final int index = inputRef.getIndex();
+                        if (index < windowInputColumn) {
+                            beReferred.set(index);
+                        }
+                        return inputRef;
+                    }
+                };
+
+        // reference in project
+        referenceFinder.visitEach(project.getProjects());
+
+        // reference in over windows
+        for (RexOver rexOver : rexOverList) {
+            RexWindow rexWindow = rexOver.getWindow();
+            // reference in partition-By
+            referenceFinder.visitEach(rexWindow.partitionKeys);
+            // reference in order-By
+            referenceFinder.visitEach(
+                    rexWindow.orderKeys.stream().map(Pair::getKey).collect(Collectors.toList()));
+            // reference in operand
+            referenceFinder.visitEach(rexOver.getOperands());
+        }
+        return beReferred.build();
+    }
+
+    private static int getAdjustedIndex(
+            final int initIndex, final ImmutableBitSet beReferred, final int windowInputColumn) {
+        if (initIndex >= windowInputColumn) {
+            return beReferred.cardinality() + (initIndex - windowInputColumn);
+        } else {
+            return beReferred.get(0, initIndex).cardinality();
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index 4e834c76555..0352b09e1ac 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -640,6 +640,33 @@ public class HiveDialectQueryITCase {
         assertThat(result.toString()).isEqualTo("[+I[[14, 28]]]");
     }
 
+    @Test
+    public void testWithOverWindow() throws Exception {
+        tableEnv.executeSql("create table over_test(a int, b int, c int, d int)");
+        try {
+            tableEnv.executeSql(
+                            "insert into over_test values(3, 2, 1, 4), (1, 2, 3, 4), (2, 1, 4, 4)")
+                    .await();
+            List<Row> result =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql(
+                                            "select a, count(b) over(order by a rows between 1 preceding and 1 following) from over_test")
+                                    .collect());
+            assertThat(result.toString()).isEqualTo("[+I[1, 2], +I[2, 3], +I[3, 2]]");
+
+            result =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql(
+                                            "select a, count(b) over(order by a rows between 1 preceding and 1 following),"
+                                                    + " count(c) over(distribute by a sort by b range between 5 preceding and current row)"
+                                                    + " from over_test")
+                                    .collect());
+            assertThat(result.toString()).isEqualTo("[+I[1, 2, 1], +I[2, 3, 1], +I[3, 2, 1]]");
+        } finally {
+            tableEnv.executeSql("drop table over_test");
+        }
+    }
+
     private void runQFile(File qfile) throws Exception {
         QTest qTest = extractQTest(qfile);
         for (int i = 0; i < qTest.statements.size(); i++) {