You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/15 10:02:28 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #13631: [FLINK-19639][table sql/planner]Support SupportsNestedProjectionPushD…

godfreyhe commented on a change in pull request #13631:
URL: https://github.com/apache/flink/pull/13631#discussion_r505333391



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
##########
@@ -268,7 +268,8 @@ class RefFieldAccessorVisitor(usedFields: Array[Int]) extends RexVisitorImpl[Uni
             // access is top-level access => return top-level access
             case _ :: _ if nestedAccess.equals("*") => List("*")
             // previous access is not prefix of this access => add access
-            case head :: _ if !nestedAccess.startsWith(head) =>
+            // it may cause bug without "." as tail if we have references a.b and a.bb
+            case head :: _ if !nestedAccess.startsWith(head + ".") =>

Review comment:
       what if a field name contains `.` ?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriter.scala
##########
@@ -60,3 +79,88 @@ class InputRewriter(fieldMap: Map[Int, Int]) extends RexShuttle {
     fieldMap.getOrElse(ref.getIndex,
       throw new IllegalArgumentException("input field contains invalid index"))
 }
+
+/**
+ * A RexShuttle to rewrite field accesses of RexNode with nested projection.
+ * For `RexInputRef`, it works like `InputReWriter` and use the old input
+ * ref index to find the new input fields ref.
+ * For `RexFieldAccess`, it will traverse to the top level of the access and
+ * find the mapping in field fieldMap first. There are 3 situations we need to consider:
+ *  1. mapping has the top level access, we should make field access to the reference;
+ *  2. mapping has the field, we should make an access;
+ *  3. mapping has no information of the current name, we should keep the full name
+ *  of the fields and index of mapping for later lookup.
+ * When the process is back from the recursion, we still have 2 situations need to
+ * consider:
+ *  1. we have found the reference of the upper level, we just make an access above the
+ *  reference we find before;
+ *  2. we haven't found the reference of the upper level, we concatenate the prefix with
+ *  the current field name and look up the new prefix in the mapping. If it's in the mapping,
+ *  we create a reference. Otherwise, we should go to the next level with the new prefix.
+ */
+class NestedInputRewriter(
+  fieldMap: JMap[Integer, JMap[String, Integer]],
+  rowTypes: JList[RelDataType],
+  builder: RexBuilder) extends RexShuttle {
+
+  override def visitFieldAccess(input: RexFieldAccess): RexNode = {
+    def traverse(fieldAccess: RexFieldAccess): (Int, String, Option[RexNode]) = {
+      fieldAccess.getReferenceExpr match {
+        case ref: RexInputRef =>
+          val mapping =
+            fieldMap.getOrElse(ref.getIndex,
+              throw new IllegalArgumentException("input field contains unknown index"))
+          if (mapping.contains("*")) {
+            (ref.getIndex,
+              "",
+              Option.apply(builder.makeFieldAccess(
+                new RexInputRef(mapping("*"), rowTypes(mapping("*"))),
+                fieldAccess.getField.getName,
+                false))
+            )
+          } else if(mapping.contains(fieldAccess.getField.getName)) {
+            (ref.getIndex,
+              "",
+              Option.apply(new RexInputRef(mapping(fieldAccess.getField.getName),

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
##########
@@ -107,4 +96,13 @@ private void testNestedProject(boolean nestedProjectionSupported) {
 		util().verifyPlan(sqlQuery);
 	}
 
+	@Test
+	public void testComplicatedNestedProject() {
+		String sqlQuery = "SELECT id," +
+				"    deepNested.nested1.name AS nestedName,\n" +
+				"    deepNested.nested2 AS nested2,\n" +
+				"    deepNested.nested2.num AS nestedNum\n" +
+				"FROM NestedTable";
+		util().verifyPlan(sqlQuery);

Review comment:
       nit: add a test about complex expressions, such as  deepNested.nested1.name + nested.value

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriter.scala
##########
@@ -60,3 +79,88 @@ class InputRewriter(fieldMap: Map[Int, Int]) extends RexShuttle {
     fieldMap.getOrElse(ref.getIndex,
       throw new IllegalArgumentException("input field contains invalid index"))
 }
+
+/**
+ * A RexShuttle to rewrite field accesses of RexNode with nested projection.
+ * For `RexInputRef`, it works like `InputReWriter` and use the old input
+ * ref index to find the new input fields ref.
+ * For `RexFieldAccess`, it will traverse to the top level of the access and
+ * find the mapping in field fieldMap first. There are 3 situations we need to consider:
+ *  1. mapping has the top level access, we should make field access to the reference;
+ *  2. mapping has the field, we should make an access;
+ *  3. mapping has no information of the current name, we should keep the full name
+ *  of the fields and index of mapping for later lookup.
+ * When the process is back from the recursion, we still have 2 situations need to
+ * consider:
+ *  1. we have found the reference of the upper level, we just make an access above the
+ *  reference we find before;
+ *  2. we haven't found the reference of the upper level, we concatenate the prefix with
+ *  the current field name and look up the new prefix in the mapping. If it's in the mapping,
+ *  we create a reference. Otherwise, we should go to the next level with the new prefix.
+ */
+class NestedInputRewriter(
+  fieldMap: JMap[Integer, JMap[String, Integer]],
+  rowTypes: JList[RelDataType],
+  builder: RexBuilder) extends RexShuttle {
+
+  override def visitFieldAccess(input: RexFieldAccess): RexNode = {
+    def traverse(fieldAccess: RexFieldAccess): (Int, String, Option[RexNode]) = {
+      fieldAccess.getReferenceExpr match {
+        case ref: RexInputRef =>
+          val mapping =
+            fieldMap.getOrElse(ref.getIndex,
+              throw new IllegalArgumentException("input field contains unknown index"))
+          if (mapping.contains("*")) {
+            (ref.getIndex,
+              "",
+              Option.apply(builder.makeFieldAccess(

Review comment:
       nit: Option.apply() -> Some()

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriter.scala
##########
@@ -60,3 +79,88 @@ class InputRewriter(fieldMap: Map[Int, Int]) extends RexShuttle {
     fieldMap.getOrElse(ref.getIndex,
       throw new IllegalArgumentException("input field contains invalid index"))
 }
+
+/**
+ * A RexShuttle to rewrite field accesses of RexNode with nested projection.
+ * For `RexInputRef`, it works like `InputReWriter` and use the old input
+ * ref index to find the new input fields ref.
+ * For `RexFieldAccess`, it will traverse to the top level of the access and
+ * find the mapping in field fieldMap first. There are 3 situations we need to consider:
+ *  1. mapping has the top level access, we should make field access to the reference;
+ *  2. mapping has the field, we should make an access;
+ *  3. mapping has no information of the current name, we should keep the full name
+ *  of the fields and index of mapping for later lookup.
+ * When the process is back from the recursion, we still have 2 situations need to
+ * consider:
+ *  1. we have found the reference of the upper level, we just make an access above the
+ *  reference we find before;
+ *  2. we haven't found the reference of the upper level, we concatenate the prefix with
+ *  the current field name and look up the new prefix in the mapping. If it's in the mapping,
+ *  we create a reference. Otherwise, we should go to the next level with the new prefix.
+ */
+class NestedInputRewriter(
+  fieldMap: JMap[Integer, JMap[String, Integer]],
+  rowTypes: JList[RelDataType],
+  builder: RexBuilder) extends RexShuttle {
+
+  override def visitFieldAccess(input: RexFieldAccess): RexNode = {
+    def traverse(fieldAccess: RexFieldAccess): (Int, String, Option[RexNode]) = {
+      fieldAccess.getReferenceExpr match {
+        case ref: RexInputRef =>
+          val mapping =
+            fieldMap.getOrElse(ref.getIndex,
+              throw new IllegalArgumentException("input field contains unknown index"))
+          if (mapping.contains("*")) {
+            (ref.getIndex,
+              "",
+              Option.apply(builder.makeFieldAccess(
+                new RexInputRef(mapping("*"), rowTypes(mapping("*"))),
+                fieldAccess.getField.getName,
+                false))
+            )
+          } else if(mapping.contains(fieldAccess.getField.getName)) {
+            (ref.getIndex,
+              "",
+              Option.apply(new RexInputRef(mapping(fieldAccess.getField.getName),
+                rowTypes(mapping(fieldAccess.getField.getName)))))
+          } else {
+            (ref.getIndex, fieldAccess.getField.getName, Option.empty)
+          }
+        case acc: RexFieldAccess =>
+          val (i, prefix, node) = traverse(acc)
+          if (node.isDefined) {
+            (i,
+              "",
+              Option.apply(builder.makeFieldAccess(node.get, fieldAccess.getField.getName, false)))
+          } else {
+            val newPrefix = s"$prefix.${fieldAccess.getField.getName}"
+            // we have checked before
+            val mapping = fieldMap(i)
+            if (mapping.contains(newPrefix)) {
+              (i,
+                "",
+                Option.apply(new RexInputRef(mapping(newPrefix), rowTypes(mapping(newPrefix)))))
+            } else {
+              (i, newPrefix, Option.empty)
+            }
+          }
+      }
+    }

Review comment:
       it's better we can move this method out of its parent method, which could improve code readability 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org