You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/25 07:00:32 UTC

flink git commit: [FLINK-5698] [table] Add NestedFieldsProjectableTableSource interface.

Repository: flink
Updated Branches:
  refs/heads/master cac9fa028 -> 5c37e55c8


[FLINK-5698] [table] Add NestedFieldsProjectableTableSource interface.

This closes #3269.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c37e55c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c37e55c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c37e55c

Branch: refs/heads/master
Commit: 5c37e55c83f854c1a9eb7bd7438b378b8c4b0a9f
Parents: cac9fa0
Author: tonycox <an...@epam.com>
Authored: Mon Feb 6 16:32:45 2017 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Mar 25 00:32:31 2017 +0100

----------------------------------------------------------------------
 ...PushProjectIntoTableSourceScanRuleBase.scala |  17 +-
 .../table/plan/util/RexProgramExtractor.scala   |  81 +++++++++
 .../NestedFieldsProjectableTableSource.scala    |  54 ++++++
 .../plan/util/RexProgramExtractorTest.scala     | 181 ++++++++++++++++++-
 .../flink/table/utils/InputTypeBuilder.scala    |  53 ++++++
 5 files changed, 380 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
index 9f9c805..1e75971 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.core.Calc
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.plan.nodes.TableSourceScan
 import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter}
-import org.apache.flink.table.sources.ProjectableTableSource
+import org.apache.flink.table.sources.{NestedFieldsProjectableTableSource, ProjectableTableSource}
 
 trait PushProjectIntoTableSourceScanRuleBase {
 
@@ -35,9 +35,18 @@ trait PushProjectIntoTableSourceScanRuleBase {
     val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram)
 
     // if no fields can be projected, we keep the original plan.
-    if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) {
-      val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
-      val newTableSource = originTableSource.projectFields(usedFields)
+    val source = scan.tableSource
+    if (TableEnvironment.getFieldNames(source).length != usedFields.length) {
+
+      val newTableSource = source match {
+        case nested: NestedFieldsProjectableTableSource[_] =>
+          val nestedFields = RexProgramExtractor
+            .extractRefNestedInputFields(calc.getProgram, usedFields)
+          nested.projectNestedFields(usedFields, nestedFields)
+        case projecting: ProjectableTableSource[_] =>
+          projecting.projectFields(usedFields)
+      }
+
       val newScan = scan.copy(scan.getTraitSet, newTableSource)
       val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection(
         calc.getProgram,

http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
index 433a35b..a042f55 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
@@ -92,6 +92,26 @@ object RexProgramExtractor {
       case _ => (Array.empty, Array.empty)
     }
   }
+
+  /**
+    * Extracts the name of nested input fields accessed by the RexProgram and returns the
+    * prefix of the accesses.
+    *
+    * @param rexProgram The RexProgram to analyze
+    * @return The full names of accessed input fields. e.g. field.subfield
+    */
+  def extractRefNestedInputFields(
+      rexProgram: RexProgram, usedFields: Array[Int]): Array[Array[String]] = {
+
+    val visitor = new RefFieldAccessorVisitor(usedFields)
+    rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+    val condition = rexProgram.getCondition
+    if (condition != null) {
+      rexProgram.expandLocalRef(condition).accept(visitor)
+    }
+    visitor.getProjectedFields
+  }
 }
 
 /**
@@ -181,3 +201,64 @@ class RexNodeToExpressionConverter(
   }
 
 }
+
+/**
+  * A RexVisitor to extract used nested input fields
+  */
+class RefFieldAccessorVisitor(usedFields: Array[Int]) extends RexVisitorImpl[Unit](true) {
+
+  private val projectedFields: Array[Array[String]] = Array.fill(usedFields.length)(Array.empty)
+
+  private val order: Map[Int, Int] = usedFields.zipWithIndex.toMap
+
+  /** Returns the prefix of the nested field accesses */
+  def getProjectedFields: Array[Array[String]] = {
+
+    projectedFields.map { nestedFields =>
+      // sort nested field accesses
+      val sorted = nestedFields.sorted
+      // get prefix field accesses
+      val prefixAccesses = sorted.foldLeft(Nil: List[String]) {
+        (prefixAccesses, nestedAccess) => prefixAccesses match {
+              // first access => add access
+            case Nil => List[String](nestedAccess)
+              // top-level access already found => return top-level access
+            case head :: Nil if head.equals("*") => prefixAccesses
+              // access is top-level access => return top-level access
+            case _ :: _ if nestedAccess.equals("*") => List("*")
+            // previous access is not prefix of this access => add access
+            case head :: _ if !nestedAccess.startsWith(head) =>
+              nestedAccess :: prefixAccesses
+              // previous access is a prefix of this access => do not add access
+            case _ => prefixAccesses
+          }
+      }
+      prefixAccesses.toArray
+    }
+  }
+
+  override def visitFieldAccess(fieldAccess: RexFieldAccess): Unit = {
+    def internalVisit(fieldAccess: RexFieldAccess): (Int, String) = {
+      fieldAccess.getReferenceExpr match {
+        case ref: RexInputRef =>
+          (ref.getIndex, fieldAccess.getField.getName)
+        case fac: RexFieldAccess =>
+          val (i, n) = internalVisit(fac)
+          (i, s"$n.${fieldAccess.getField.getName}")
+      }
+    }
+    val (index, fullName) = internalVisit(fieldAccess)
+    val outputIndex = order.getOrElse(index, -1)
+    val fields: Array[String] = projectedFields(outputIndex)
+    projectedFields(outputIndex) = fields :+ fullName
+  }
+
+  override def visitInputRef(inputRef: RexInputRef): Unit = {
+    val outputIndex = order.getOrElse(inputRef.getIndex, -1)
+    val fields: Array[String] = projectedFields(outputIndex)
+    projectedFields(outputIndex) = fields :+ "*"
+  }
+
+  override def visitCall(call: RexCall): Unit =
+    call.operands.foreach(operand => operand.accept(this))
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala
new file mode 100644
index 0000000..a10187b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+/**
+  * Adds support for projection push-down to a [[TableSource]] with nested fields.
+  * A [[TableSource]] extending this interface is able
+  * to project the nested fields of the returned table.
+  *
+  * @tparam T The return type of the [[NestedFieldsProjectableTableSource]].
+  */
+trait NestedFieldsProjectableTableSource[T] {
+
+  /**
+    * Creates a copy of the [[TableSource]] that projects its output on the specified nested fields.
+    *
+    * @param fields The indexes of the fields to return.
+    * @param nestedFields The accessed nested fields of the fields to return.
+    *
+    * e.g.
+    * tableSchema = {
+    *       id,
+    *       student<\school<\city, tuition>, age, name>,
+    *       teacher<\age, name>
+    *       }
+    *
+    * select (id, student.school.city, student.age, teacher)
+    *
+    * fields = field = [0, 1, 2]
+    * nestedFields  \[\["*"], ["school.city", "age"], ["*"\]\]
+    *
+    * @return A copy of the [[TableSource]] that projects its output.
+    */
+  def projectNestedFields(
+      fields: Array[Int],
+      nestedFields: Array[Array[String]]): TableSource[T]
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
index b0a5fcf..999d20f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
@@ -20,12 +20,15 @@ package org.apache.flink.table.plan.util
 
 import java.math.BigDecimal
 
-import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder}
+import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
 import org.apache.calcite.sql.SqlPostfixOperator
+import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.utils.InputTypeBuilder.inputOf
 import org.apache.flink.table.validate.FunctionCatalog
-import org.junit.Assert.{assertArrayEquals, assertEquals}
+import org.hamcrest.CoreMatchers.is
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat}
 import org.junit.Test
 
 import scala.collection.JavaConverters._
@@ -306,6 +309,180 @@ class RexProgramExtractorTest extends RexProgramTestBase {
       unconvertedRexNodes(1).toString)
   }
 
+  @Test
+  def testExtractRefNestedInputFields(): Unit = {
+    val rexProgram = buildRexProgramWithNesting()
+
+    val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram)
+    val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields)
+
+    val expected = Array(Array("amount"), Array("*"))
+    assertThat(usedNestedFields, is(expected))
+  }
+
+  @Test
+  def testExtractRefNestedInputFieldsWithNoNesting(): Unit = {
+    val rexProgram = buildSimpleRexProgram()
+
+    val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram)
+    val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields)
+
+    val expected = Array(Array("*"), Array("*"), Array("*"))
+    assertThat(usedNestedFields, is(expected))
+  }
+
+  @Test
+  def testExtractDeepRefNestedInputFields(): Unit = {
+    val rexProgram = buildRexProgramWithDeepNesting()
+
+    val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram)
+    val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields)
+
+    val expected = Array(
+      Array("amount"),
+      Array("*"),
+      Array("with.deeper.entry", "with.deep.entry"))
+
+    assertThat(usedFields, is(Array(1, 0, 2)))
+    assertThat(usedNestedFields, is(expected))
+  }
+
+  private def buildRexProgramWithDeepNesting(): RexProgram = {
+
+    // person input
+    val passportRow = inputOf(typeFactory)
+      .field("id", VARCHAR)
+      .field("status", VARCHAR)
+      .build
+
+    val personRow = inputOf(typeFactory)
+      .field("name", VARCHAR)
+      .field("age", INTEGER)
+      .nestedField("passport", passportRow)
+      .build
+
+    // payment input
+    val paymentRow = inputOf(typeFactory)
+      .field("id", BIGINT)
+      .field("amount", INTEGER)
+      .build
+
+    // deep field input
+    val deepRowType = inputOf(typeFactory)
+      .field("entry", VARCHAR)
+      .build
+
+    val entryRowType = inputOf(typeFactory)
+      .nestedField("inside", deepRowType)
+      .build
+
+    val deeperRowType = inputOf(typeFactory)
+      .nestedField("entry", entryRowType)
+      .build
+
+    val withRowType = inputOf(typeFactory)
+      .nestedField("deep", deepRowType)
+      .nestedField("deeper", deeperRowType)
+      .build
+
+    val fieldRowType = inputOf(typeFactory)
+      .nestedField("with", withRowType)
+      .build
+
+    // main input
+    val inputRowType = inputOf(typeFactory)
+      .nestedField("persons", personRow)
+      .nestedField("payments", paymentRow)
+      .nestedField("field", fieldRowType)
+      .build
+
+    // inputRowType
+    //
+    // [ persons:  [ name: VARCHAR, age:  INT, passport: [id: VARCHAR, status: VARCHAR ] ],
+    //   payments: [ id: BIGINT, amount: INT ],
+    //   field:    [ with: [ deep: [ entry: VARCHAR ],
+    //                       deeper: [ entry: [ inside: [entry: VARCHAR ] ] ]
+    //             ] ]
+    // ]
+
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+    val t0 = rexBuilder.makeInputRef(personRow, 0)
+    val t1 = rexBuilder.makeInputRef(paymentRow, 1)
+    val t2 = rexBuilder.makeInputRef(fieldRowType, 2)
+    val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(10L))
+
+    // person
+    val person$pass = rexBuilder.makeFieldAccess(t0, "passport", false)
+    val person$pass$stat = rexBuilder.makeFieldAccess(person$pass, "status", false)
+
+    // payment
+    val pay$amount = rexBuilder.makeFieldAccess(t1, "amount", false)
+    val multiplyAmount = builder.addExpr(
+      rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, pay$amount, t3))
+
+    // field
+    val field$with = rexBuilder.makeFieldAccess(t2, "with", false)
+    val field$with$deep = rexBuilder.makeFieldAccess(field$with, "deep", false)
+    val field$with$deeper = rexBuilder.makeFieldAccess(field$with, "deeper", false)
+    val field$with$deep$entry = rexBuilder.makeFieldAccess(field$with$deep, "entry", false)
+    val field$with$deeper$entry = rexBuilder.makeFieldAccess(field$with$deeper, "entry", false)
+    val field$with$deeper$entry$inside = rexBuilder
+      .makeFieldAccess(field$with$deeper$entry, "inside", false)
+    val field$with$deeper$entry$inside$entry = rexBuilder
+      .makeFieldAccess(field$with$deeper$entry$inside, "entry", false)
+
+    builder.addProject(multiplyAmount, "amount")
+    builder.addProject(person$pass$stat, "status")
+    builder.addProject(field$with$deep$entry, "entry")
+    builder.addProject(field$with$deeper$entry$inside$entry, "entry")
+    builder.addProject(field$with$deeper$entry, "entry2")
+    builder.addProject(t0, "person")
+
+    // Program
+    // (
+    //   payments.amount * 10),
+    //   persons.passport.status,
+    //   field.with.deep.entry
+    //   field.with.deeper.entry.inside.entry
+    //   field.with.deeper.entry
+    //   persons
+    // )
+
+    builder.getProgram
+
+  }
+
+  private def buildRexProgramWithNesting(): RexProgram = {
+
+    val personRow = inputOf(typeFactory)
+      .field("name", INTEGER)
+      .field("age", VARCHAR)
+      .build
+
+    val paymentRow = inputOf(typeFactory)
+      .field("id", BIGINT)
+      .field("amount", INTEGER)
+      .build
+
+    val types = List(personRow, paymentRow).asJava
+    val names = List("persons", "payments").asJava
+    val inputRowType = typeFactory.createStructType(types, names)
+
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+    val t0 = rexBuilder.makeInputRef(types.get(0), 0)
+    val t1 = rexBuilder.makeInputRef(types.get(1), 1)
+    val t2 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+
+    val payment$amount = rexBuilder.makeFieldAccess(t1, "amount", false)
+
+    builder.addProject(payment$amount, "amount")
+    builder.addProject(t0, "persons")
+    builder.addProject(t2, "number")
+    builder.getProgram
+  }
+
   private def testExtractSinglePostfixCondition(
       fieldIndex: Integer,
       op: SqlPostfixOperator,

http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala
new file mode 100644
index 0000000..6f11f88
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.SqlTypeName
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class InputTypeBuilder(typeFactory: JavaTypeFactory) {
+
+  private val names = mutable.ListBuffer[String]()
+  private val types = mutable.ListBuffer[RelDataType]()
+
+  def field(name: String, `type`: SqlTypeName): InputTypeBuilder = {
+    names += name
+    types += typeFactory.createSqlType(`type`)
+    this
+  }
+
+  def nestedField(name: String, `type`: RelDataType): InputTypeBuilder = {
+    names += name
+    types += `type`
+    this
+  }
+
+  def build: RelDataType = {
+    typeFactory.createStructType(types.asJava, names.asJava)
+  }
+}
+
+object InputTypeBuilder {
+
+  def inputOf(typeFactory: JavaTypeFactory) = new InputTypeBuilder(typeFactory)
+}