You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/25 07:00:32 UTC
flink git commit: [FLINK-5698] [table] Add
NestedFieldsProjectableTableSource interface.
Repository: flink
Updated Branches:
refs/heads/master cac9fa028 -> 5c37e55c8
[FLINK-5698] [table] Add NestedFieldsProjectableTableSource interface.
This closes #3269.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c37e55c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c37e55c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c37e55c
Branch: refs/heads/master
Commit: 5c37e55c83f854c1a9eb7bd7438b378b8c4b0a9f
Parents: cac9fa0
Author: tonycox <an...@epam.com>
Authored: Mon Feb 6 16:32:45 2017 +0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Mar 25 00:32:31 2017 +0100
----------------------------------------------------------------------
...PushProjectIntoTableSourceScanRuleBase.scala | 17 +-
.../table/plan/util/RexProgramExtractor.scala | 81 +++++++++
.../NestedFieldsProjectableTableSource.scala | 54 ++++++
.../plan/util/RexProgramExtractorTest.scala | 181 ++++++++++++++++++-
.../flink/table/utils/InputTypeBuilder.scala | 53 ++++++
5 files changed, 380 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
index 9f9c805..1e75971 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/PushProjectIntoTableSourceScanRuleBase.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.core.Calc
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.plan.nodes.TableSourceScan
import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter}
-import org.apache.flink.table.sources.ProjectableTableSource
+import org.apache.flink.table.sources.{NestedFieldsProjectableTableSource, ProjectableTableSource}
trait PushProjectIntoTableSourceScanRuleBase {
@@ -35,9 +35,18 @@ trait PushProjectIntoTableSourceScanRuleBase {
val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram)
// if no fields can be projected, we keep the original plan.
- if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) {
- val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]]
- val newTableSource = originTableSource.projectFields(usedFields)
+ val source = scan.tableSource
+ if (TableEnvironment.getFieldNames(source).length != usedFields.length) {
+
+ val newTableSource = source match {
+ case nested: NestedFieldsProjectableTableSource[_] =>
+ val nestedFields = RexProgramExtractor
+ .extractRefNestedInputFields(calc.getProgram, usedFields)
+ nested.projectNestedFields(usedFields, nestedFields)
+ case projecting: ProjectableTableSource[_] =>
+ projecting.projectFields(usedFields)
+ }
+
val newScan = scan.copy(scan.getTraitSet, newTableSource)
val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection(
calc.getProgram,
http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
index 433a35b..a042f55 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
@@ -92,6 +92,26 @@ object RexProgramExtractor {
case _ => (Array.empty, Array.empty)
}
}
+
+ /**
+ * Extracts the name of nested input fields accessed by the RexProgram and returns the
+ * prefix of the accesses.
+ *
+ * @param rexProgram The RexProgram to analyze
+ * @return The full names of accessed input fields. e.g. field.subfield
+ */
+ def extractRefNestedInputFields(
+ rexProgram: RexProgram, usedFields: Array[Int]): Array[Array[String]] = {
+
+ val visitor = new RefFieldAccessorVisitor(usedFields)
+ rexProgram.getProjectList.foreach(exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+ val condition = rexProgram.getCondition
+ if (condition != null) {
+ rexProgram.expandLocalRef(condition).accept(visitor)
+ }
+ visitor.getProjectedFields
+ }
}
/**
@@ -181,3 +201,64 @@ class RexNodeToExpressionConverter(
}
}
+
+/**
+ * A RexVisitor to extract used nested input fields
+ */
+class RefFieldAccessorVisitor(usedFields: Array[Int]) extends RexVisitorImpl[Unit](true) {
+
+ private val projectedFields: Array[Array[String]] = Array.fill(usedFields.length)(Array.empty)
+
+ private val order: Map[Int, Int] = usedFields.zipWithIndex.toMap
+
+ /** Returns the prefix of the nested field accesses */
+ def getProjectedFields: Array[Array[String]] = {
+
+ projectedFields.map { nestedFields =>
+ // sort nested field accesses
+ val sorted = nestedFields.sorted
+ // get prefix field accesses
+ val prefixAccesses = sorted.foldLeft(Nil: List[String]) {
+ (prefixAccesses, nestedAccess) => prefixAccesses match {
+ // first access => add access
+ case Nil => List[String](nestedAccess)
+ // top-level access already found => return top-level access
+ case head :: Nil if head.equals("*") => prefixAccesses
+ // access is top-level access => return top-level access
+ case _ :: _ if nestedAccess.equals("*") => List("*")
+ // previous access is not prefix of this access => add access
+ case head :: _ if !nestedAccess.startsWith(head) =>
+ nestedAccess :: prefixAccesses
+ // previous access is a prefix of this access => do not add access
+ case _ => prefixAccesses
+ }
+ }
+ prefixAccesses.toArray
+ }
+ }
+
+ override def visitFieldAccess(fieldAccess: RexFieldAccess): Unit = {
+ def internalVisit(fieldAccess: RexFieldAccess): (Int, String) = {
+ fieldAccess.getReferenceExpr match {
+ case ref: RexInputRef =>
+ (ref.getIndex, fieldAccess.getField.getName)
+ case fac: RexFieldAccess =>
+ val (i, n) = internalVisit(fac)
+ (i, s"$n.${fieldAccess.getField.getName}")
+ }
+ }
+ val (index, fullName) = internalVisit(fieldAccess)
+ val outputIndex = order.getOrElse(index, -1)
+ val fields: Array[String] = projectedFields(outputIndex)
+ projectedFields(outputIndex) = fields :+ fullName
+ }
+
+ override def visitInputRef(inputRef: RexInputRef): Unit = {
+ val outputIndex = order.getOrElse(inputRef.getIndex, -1)
+ val fields: Array[String] = projectedFields(outputIndex)
+ projectedFields(outputIndex) = fields :+ "*"
+ }
+
+ override def visitCall(call: RexCall): Unit =
+ call.operands.foreach(operand => operand.accept(this))
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala
new file mode 100644
index 0000000..a10187b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+/**
+ * Adds support for projection push-down to a [[TableSource]] with nested fields.
+ * A [[TableSource]] extending this interface is able
+ * to project the nested fields of the returned table.
+ *
+ * @tparam T The return type of the [[NestedFieldsProjectableTableSource]].
+ */
+trait NestedFieldsProjectableTableSource[T] {
+
+ /**
+ * Creates a copy of the [[TableSource]] that projects its output on the specified nested fields.
+ *
+ * @param fields The indexes of the fields to return.
+ * @param nestedFields The accessed nested fields of the fields to return.
+ *
+ * e.g.
+ * tableSchema = {
+ * id,
+ * student<\school<\city, tuition>, age, name>,
+ * teacher<\age, name>
+ * }
+ *
+ * select (id, student.school.city, student.age, teacher)
+ *
+ * fields = field = [0, 1, 2]
+ * nestedFields \[\["*"], ["school.city", "age"], ["*"\]\]
+ *
+ * @return A copy of the [[TableSource]] that projects its output.
+ */
+ def projectNestedFields(
+ fields: Array[Int],
+ nestedFields: Array[Array[String]]): TableSource[T]
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
index b0a5fcf..999d20f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/util/RexProgramExtractorTest.scala
@@ -20,12 +20,15 @@ package org.apache.flink.table.plan.util
import java.math.BigDecimal
-import org.apache.calcite.rex.{RexBuilder, RexProgramBuilder}
+import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
import org.apache.calcite.sql.SqlPostfixOperator
+import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.flink.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.table.utils.InputTypeBuilder.inputOf
import org.apache.flink.table.validate.FunctionCatalog
-import org.junit.Assert.{assertArrayEquals, assertEquals}
+import org.hamcrest.CoreMatchers.is
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat}
import org.junit.Test
import scala.collection.JavaConverters._
@@ -306,6 +309,180 @@ class RexProgramExtractorTest extends RexProgramTestBase {
unconvertedRexNodes(1).toString)
}
+ @Test
+ def testExtractRefNestedInputFields(): Unit = {
+ val rexProgram = buildRexProgramWithNesting()
+
+ val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram)
+ val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields)
+
+ val expected = Array(Array("amount"), Array("*"))
+ assertThat(usedNestedFields, is(expected))
+ }
+
+ @Test
+ def testExtractRefNestedInputFieldsWithNoNesting(): Unit = {
+ val rexProgram = buildSimpleRexProgram()
+
+ val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram)
+ val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields)
+
+ val expected = Array(Array("*"), Array("*"), Array("*"))
+ assertThat(usedNestedFields, is(expected))
+ }
+
+ @Test
+ def testExtractDeepRefNestedInputFields(): Unit = {
+ val rexProgram = buildRexProgramWithDeepNesting()
+
+ val usedFields = RexProgramExtractor.extractRefInputFields(rexProgram)
+ val usedNestedFields = RexProgramExtractor.extractRefNestedInputFields(rexProgram, usedFields)
+
+ val expected = Array(
+ Array("amount"),
+ Array("*"),
+ Array("with.deeper.entry", "with.deep.entry"))
+
+ assertThat(usedFields, is(Array(1, 0, 2)))
+ assertThat(usedNestedFields, is(expected))
+ }
+
+ private def buildRexProgramWithDeepNesting(): RexProgram = {
+
+ // person input
+ val passportRow = inputOf(typeFactory)
+ .field("id", VARCHAR)
+ .field("status", VARCHAR)
+ .build
+
+ val personRow = inputOf(typeFactory)
+ .field("name", VARCHAR)
+ .field("age", INTEGER)
+ .nestedField("passport", passportRow)
+ .build
+
+ // payment input
+ val paymentRow = inputOf(typeFactory)
+ .field("id", BIGINT)
+ .field("amount", INTEGER)
+ .build
+
+ // deep field input
+ val deepRowType = inputOf(typeFactory)
+ .field("entry", VARCHAR)
+ .build
+
+ val entryRowType = inputOf(typeFactory)
+ .nestedField("inside", deepRowType)
+ .build
+
+ val deeperRowType = inputOf(typeFactory)
+ .nestedField("entry", entryRowType)
+ .build
+
+ val withRowType = inputOf(typeFactory)
+ .nestedField("deep", deepRowType)
+ .nestedField("deeper", deeperRowType)
+ .build
+
+ val fieldRowType = inputOf(typeFactory)
+ .nestedField("with", withRowType)
+ .build
+
+ // main input
+ val inputRowType = inputOf(typeFactory)
+ .nestedField("persons", personRow)
+ .nestedField("payments", paymentRow)
+ .nestedField("field", fieldRowType)
+ .build
+
+ // inputRowType
+ //
+ // [ persons: [ name: VARCHAR, age: INT, passport: [id: VARCHAR, status: VARCHAR ] ],
+ // payments: [ id: BIGINT, amount: INT ],
+ // field: [ with: [ deep: [ entry: VARCHAR ],
+ // deeper: [ entry: [ inside: [entry: VARCHAR ] ] ]
+ // ] ]
+ // ]
+
+ val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+ val t0 = rexBuilder.makeInputRef(personRow, 0)
+ val t1 = rexBuilder.makeInputRef(paymentRow, 1)
+ val t2 = rexBuilder.makeInputRef(fieldRowType, 2)
+ val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(10L))
+
+ // person
+ val person$pass = rexBuilder.makeFieldAccess(t0, "passport", false)
+ val person$pass$stat = rexBuilder.makeFieldAccess(person$pass, "status", false)
+
+ // payment
+ val pay$amount = rexBuilder.makeFieldAccess(t1, "amount", false)
+ val multiplyAmount = builder.addExpr(
+ rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, pay$amount, t3))
+
+ // field
+ val field$with = rexBuilder.makeFieldAccess(t2, "with", false)
+ val field$with$deep = rexBuilder.makeFieldAccess(field$with, "deep", false)
+ val field$with$deeper = rexBuilder.makeFieldAccess(field$with, "deeper", false)
+ val field$with$deep$entry = rexBuilder.makeFieldAccess(field$with$deep, "entry", false)
+ val field$with$deeper$entry = rexBuilder.makeFieldAccess(field$with$deeper, "entry", false)
+ val field$with$deeper$entry$inside = rexBuilder
+ .makeFieldAccess(field$with$deeper$entry, "inside", false)
+ val field$with$deeper$entry$inside$entry = rexBuilder
+ .makeFieldAccess(field$with$deeper$entry$inside, "entry", false)
+
+ builder.addProject(multiplyAmount, "amount")
+ builder.addProject(person$pass$stat, "status")
+ builder.addProject(field$with$deep$entry, "entry")
+ builder.addProject(field$with$deeper$entry$inside$entry, "entry")
+ builder.addProject(field$with$deeper$entry, "entry2")
+ builder.addProject(t0, "person")
+
+ // Program
+ // (
+ // payments.amount * 10),
+ // persons.passport.status,
+ // field.with.deep.entry
+ // field.with.deeper.entry.inside.entry
+ // field.with.deeper.entry
+ // persons
+ // )
+
+ builder.getProgram
+
+ }
+
+ private def buildRexProgramWithNesting(): RexProgram = {
+
+ val personRow = inputOf(typeFactory)
+ .field("name", INTEGER)
+ .field("age", VARCHAR)
+ .build
+
+ val paymentRow = inputOf(typeFactory)
+ .field("id", BIGINT)
+ .field("amount", INTEGER)
+ .build
+
+ val types = List(personRow, paymentRow).asJava
+ val names = List("persons", "payments").asJava
+ val inputRowType = typeFactory.createStructType(types, names)
+
+ val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+ val t0 = rexBuilder.makeInputRef(types.get(0), 0)
+ val t1 = rexBuilder.makeInputRef(types.get(1), 1)
+ val t2 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+
+ val payment$amount = rexBuilder.makeFieldAccess(t1, "amount", false)
+
+ builder.addProject(payment$amount, "amount")
+ builder.addProject(t0, "persons")
+ builder.addProject(t2, "number")
+ builder.getProgram
+ }
+
private def testExtractSinglePostfixCondition(
fieldIndex: Integer,
op: SqlPostfixOperator,
http://git-wip-us.apache.org/repos/asf/flink/blob/5c37e55c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala
new file mode 100644
index 0000000..6f11f88
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InputTypeBuilder.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.calcite.adapter.java.JavaTypeFactory
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.SqlTypeName
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class InputTypeBuilder(typeFactory: JavaTypeFactory) {
+
+ private val names = mutable.ListBuffer[String]()
+ private val types = mutable.ListBuffer[RelDataType]()
+
+ def field(name: String, `type`: SqlTypeName): InputTypeBuilder = {
+ names += name
+ types += typeFactory.createSqlType(`type`)
+ this
+ }
+
+ def nestedField(name: String, `type`: RelDataType): InputTypeBuilder = {
+ names += name
+ types += `type`
+ this
+ }
+
+ def build: RelDataType = {
+ typeFactory.createStructType(types.asJava, names.asJava)
+ }
+}
+
+object InputTypeBuilder {
+
+ def inputOf(typeFactory: JavaTypeFactory) = new InputTypeBuilder(typeFactory)
+}