You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/12/04 09:06:24 UTC
flink git commit: [FLINK-8104] [table] Add ROW value constructor
Repository: flink
Updated Branches:
refs/heads/master 4bd0ef1b5 -> e1d656621
[FLINK-8104] [table] Add ROW value constructor
This closes #5040.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1d65662
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1d65662
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1d65662
Branch: refs/heads/master
Commit: e1d656621d8b291087652fee15d577d43d22f2d7
Parents: 4bd0ef1
Author: Rong Rong <ro...@uber.com>
Authored: Mon Nov 20 13:47:50 2017 -0800
Committer: twalthr <tw...@apache.org>
Committed: Mon Dec 4 10:06:03 2017 +0100
----------------------------------------------------------------------
docs/dev/table/sql.md | 59 +++++++--
docs/dev/table/tableApi.md | 92 ++++++++++++++
.../flink/table/api/scala/expressionDsl.scala | 17 ++-
.../flink/table/calcite/FlinkTypeFactory.scala | 8 +-
.../flink/table/codegen/CodeGenerator.scala | 34 +++++
.../table/codegen/calls/ScalarOperators.scala | 71 ++++++-----
.../flink/table/expressions/collection.scala | 31 ++++-
.../flink/table/validate/FunctionCatalog.scala | 3 +
.../flink/table/expressions/RowTypeTest.scala | 125 +++++++++++++++++++
.../table/expressions/SqlExpressionTest.scala | 6 +-
.../expressions/utils/RowTypeTestBase.scala | 64 ++++++++++
.../validation/RowTypeValidationTest.scala | 42 +++++++
.../table/runtime/batch/sql/CalcITCase.scala | 27 ++++
.../table/runtime/batch/table/CalcITCase.scala | 24 ++++
14 files changed, 554 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 50aa933..e5de70a 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -2257,7 +2257,7 @@ tableName.compositeType.*
<table class="table table-bordered">
<thead>
<tr>
- <th class="text-left" style="width: 40%">Array functions</th>
+ <th class="text-left" style="width: 40%">Value constructor functions</th>
<th class="text-center">Description</th>
</tr>
</thead>
@@ -2267,53 +2267,54 @@ tableName.compositeType.*
<tr>
<td>
{% highlight text %}
-ARRAY ‘[’ value [, value ]* ‘]’
+(value, [, value]*)
{% endhighlight %}
</td>
<td>
- <p>Creates an array from a list of values.</p>
+ <p>Creates a row from a list of values.</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
-CARDINALITY(ARRAY)
+ROW(value, [, value]*)
{% endhighlight %}
</td>
<td>
- <p>Returns the number of elements of an array.</p>
+ <p>Creates a row from a list of values.</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
-array ‘[’ index ‘]’
+ARRAY ‘[’ value [, value ]* ‘]’
{% endhighlight %}
</td>
<td>
- <p>Returns the element at a particular position in an array. The index starts at 1.</p>
+ <p>Creates an array from a list of values.</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
-ELEMENT(ARRAY)
+MAP ‘[’ key, value [, key, value ]* ‘]’
{% endhighlight %}
</td>
<td>
- <p>Returns the sole element of an array with a single element. Returns <code>null</code> if the array is empty. Throws an exception if the array has more than one element.</p>
+ <p>Creates a map from a list of key-value pairs.</p>
</td>
</tr>
+
</tbody>
</table>
<table class="table table-bordered">
<thead>
<tr>
- <th class="text-left" style="width: 40%">Map functions</th>
+ <th class="text-left" style="width: 40%">Array functions</th>
<th class="text-center">Description</th>
</tr>
</thead>
@@ -2323,17 +2324,51 @@ ELEMENT(ARRAY)
<tr>
<td>
{% highlight text %}
-MAP ‘[’ key, value [, key, value ]* ‘]’
+CARDINALITY(ARRAY)
{% endhighlight %}
</td>
<td>
- <p>Creates a map from a list of key-value pairs.</p>
+ <p>Returns the number of elements of an array.</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
+array ‘[’ index ‘]’
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the element at a particular position in an array. The index starts at 1.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight text %}
+ELEMENT(ARRAY)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the sole element of an array with a single element. Returns <code>null</code> if the array is empty. Throws an exception if the array has more than one element.</p>
+ </td>
+ </tr>
+ </tbody>
+</table>
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 40%">Map functions</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+
+ <tr>
+ <td>
+ {% highlight text %}
CARDINALITY(MAP)
{% endhighlight %}
</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 498dbbc..8b6fa72 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2996,6 +2996,30 @@ MAP.at(ANY)
<table class="table table-bordered">
<thead>
<tr>
+ <th class="text-left" style="width: 40%">Row functions</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+
+ <tr>
+ <td>
+ {% highlight java %}
+row(ANY, [, ANY]*)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Creates a row from a list of values. Row is composite type and can be access via <a href="tableApi.html#built-in-functions">value access functions</a>.</p>
+ </td>
+ </tr>
+
+ </tbody>
+</table>
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
<th class="text-left" style="width: 40%">Auxiliary functions</th>
<th class="text-center">Description</th>
</tr>
@@ -4287,7 +4311,75 @@ ARRAY.element()
</tbody>
</table>
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 40%">Map functions</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+
+ <tr>
+ <td>
+ {% highlight scala %}
+map(ANY, ANY [, ANY, ANY ]*)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Creates a map from a list of key-value pairs.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
+MAP.cardinality()
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the number of entries of a map.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
+MAP.at(ANY)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the value specified by a particular key in a map.</p>
+ </td>
+ </tr>
+
+ </tbody>
+</table>
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 40%">Row functions</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+
+ <tr>
+ <td>
+ {% highlight scala %}
+row(ANY, [, ANY]*)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Creates a row from a list of values. Row is composite type and can be access via <a href="tableApi.html#built-in-functions">value access functions</a>.</p>
+ </td>
+ </tr>
+
+ </tbody>
+</table>
<table class="table table-bordered">
<thead>
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 2708b5c..d8dbdb6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -960,12 +960,25 @@ object array {
}
/**
- * Creates a map of literals. The map will be a map between two objects (not primitives).
+ * Creates a row of expressions.
+ */
+object row {
+
+ /**
+ * Creates a row of expressions.
+ */
+ def apply(head: Expression, tail: Expression*): Expression = {
+ RowConstructor(head +: tail.toSeq)
+ }
+}
+
+/**
+ * Creates a map of expressions. The map will be a map between two objects (not primitives).
*/
object map {
/**
- * Creates a map of literals. The map will be a map between two objects (not primitives).
+ * Creates a map of expressions. The map will be a map between two objects (not primitives).
*/
def apply(key: Expression, value: Expression, tail: Expression*): Expression = {
MapConstructor(Seq(key, value) ++ tail.toSeq)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 448029b..db7ffdb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -434,8 +434,12 @@ object FlinkTypeFactory {
val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType]
compositeRelDataType.compositeType
- // ROW and CURSOR for UDTF case, whose type info will never be used, just a placeholder
- case ROW | CURSOR => new NothingTypeInfo
+ case ROW if relDataType.isInstanceOf[RelRecordType] =>
+ val relRecordType = relDataType.asInstanceOf[RelRecordType]
+ new RowSchema(relRecordType).typeInfo
+
+ // CURSOR for UDTF case, whose type info will never be used, just a placeholder
+ case CURSOR => new NothingTypeInfo
case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index b253a27..b98936f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.ROW
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.typeinfo._
@@ -954,6 +955,10 @@ abstract class CodeGenerator(
requireString(left)
generateArithmeticOperator("+", nullCheck, resultType, left, right)
+ // rows
+ case ROW =>
+ generateRow(this, resultType, operands)
+
// arrays
case ARRAY_VALUE_CONSTRUCTOR =>
generateArray(this, resultType, operands)
@@ -1306,6 +1311,20 @@ abstract class CodeGenerator(
}
}
+ private[flink] def generateNullableOutputBoxing(
+ element: GeneratedExpression,
+ typeInfo: TypeInformation[_])
+ : GeneratedExpression = {
+ val boxedExpr = generateOutputFieldBoxing(element)
+ val boxedTypeTerm = boxedTypeTermForTypeInfo(typeInfo)
+ val exprOrNull: String = if (nullCheck) {
+ s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
+ } else {
+ boxedExpr.resultTerm
+ }
+ boxedExpr.copy(resultTerm = exprOrNull)
+ }
+
private[flink] def generateStreamRecordRowtimeAccess(): GeneratedExpression = {
val resultTerm = newName("result")
val nullTerm = newName("isNull")
@@ -1555,6 +1574,21 @@ abstract class CodeGenerator(
}
/**
+ * Adds a reusable [[org.apache.flink.types.Row]]
+ * to the member area of the generated [[Function]].
+ */
+ def addReusableRow(arity: Int): String = {
+ val fieldTerm = newName("row")
+ val fieldRow =
+ s"""
+ |final org.apache.flink.types.Row $fieldTerm =
+ | new org.apache.flink.types.Row($arity);
+ |""".stripMargin
+ reusableMemberStatements.add(fieldRow)
+ fieldTerm
+ }
+
+ /**
* Adds a reusable array to the member area of the generated [[Function]].
*/
def addReusableArray(clazz: Class[_], size: Int): String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 7fb8eba..b7e02ae 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -22,7 +22,8 @@ import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
import org.apache.calcite.util.BuiltInMethod
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo._
-import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
@@ -849,6 +850,41 @@ object ScalarOperators {
generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, operand)
}
+ def generateRow(
+ codeGenerator: CodeGenerator,
+ resultType: TypeInformation[_],
+ elements: Seq[GeneratedExpression])
+ : GeneratedExpression = {
+ val rowTerm = codeGenerator.addReusableRow(resultType.getArity)
+
+ val boxedElements: Seq[GeneratedExpression] = resultType match {
+ case ct: RowTypeInfo => // should always be RowTypeInfo
+ if (resultType.getArity == elements.size) {
+ elements.zipWithIndex.map {
+ case (e, idx) => codeGenerator.generateNullableOutputBoxing(e,
+ ct.getTypeAt(idx))
+ }
+ } else {
+ throw new CodeGenException(s"Illegal row generation operation. " +
+ s"Expected row arity ${resultType.getArity} but was ${elements.size}.")
+ }
+ case _ => throw new CodeGenException(s"Unsupported row generation operation. " +
+ s"Expected RowTypeInfo but was $resultType.")
+ }
+
+ val code = boxedElements
+ .zipWithIndex
+ .map { case (element, idx) =>
+ s"""
+ |${element.code}
+ |$rowTerm.setField($idx, ${element.resultTerm});
+ |""".stripMargin
+ }
+ .mkString("\n")
+
+ GeneratedExpression(rowTerm, GeneratedExpression.NEVER_NULL, code, resultType)
+ }
+
def generateArray(
codeGenerator: CodeGenerator,
resultType: TypeInformation[_],
@@ -857,21 +893,11 @@ object ScalarOperators {
val arrayTerm = codeGenerator.addReusableArray(resultType.getTypeClass, elements.size)
val boxedElements: Seq[GeneratedExpression] = resultType match {
-
+ // we box the elements to also represent null values
case oati: ObjectArrayTypeInfo[_, _] =>
- // we box the elements to also represent null values
- val boxedTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
-
elements.map { e =>
- val boxedExpr = codeGenerator.generateOutputFieldBoxing(e)
- val exprOrNull: String = if (codeGenerator.nullCheck) {
- s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
- } else {
- boxedExpr.resultTerm
- }
- boxedExpr.copy(resultTerm = exprOrNull)
+ codeGenerator.generateNullableOutputBoxing(e, oati.getComponentInfo)
}
-
// no boxing necessary
case _: PrimitiveArrayTypeInfo[_] => elements
}
@@ -1085,22 +1111,9 @@ object ScalarOperators {
val boxedElements: Seq[GeneratedExpression] = resultType match {
case mti: MapTypeInfo[_, _] =>
- // we box the elements to also represent null values
- val boxedKeyTypeTerm = boxedTypeTermForTypeInfo(mti.getKeyTypeInfo)
- val boxedValueTypeTerm = boxedTypeTermForTypeInfo(mti.getValueTypeInfo)
-
- elements.zipWithIndex.map { case (element, idx) =>
- val boxedExpr = codeGenerator.generateOutputFieldBoxing(element)
- val exprOrNull: String = if (codeGenerator.nullCheck) {
- if (idx % 2 == 0) {
- s"${boxedExpr.nullTerm} ? null : ($boxedKeyTypeTerm) ${boxedExpr.resultTerm}"
- } else {
- s"${boxedExpr.nullTerm} ? null : ($boxedValueTypeTerm) ${boxedExpr.resultTerm}"
- }
- } else {
- boxedExpr.resultTerm
- }
- boxedExpr.copy(resultTerm = exprOrNull)
+ elements.zipWithIndex.map { case (e, idx) =>
+ codeGenerator.generateNullableOutputBoxing(e,
+ if (idx % 2 == 0) mti.getKeyTypeInfo else mti.getValueTypeInfo)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
index 3b65ee4..951ae27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
@@ -23,13 +23,42 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo, ObjectArrayTypeInfo}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
import org.apache.flink.table.calcite.FlinkRelBuilder
import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isMap}
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
import scala.collection.JavaConverters._
+case class RowConstructor(elements: Seq[Expression]) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = elements
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val relDataType = relBuilder
+ .asInstanceOf[FlinkRelBuilder]
+ .getTypeFactory
+ .createTypeFromTypeInfo(resultType, isNullable = false)
+ val values = elements.map(_.toRexNode).toList.asJava
+ relBuilder
+ .getRexBuilder
+ .makeCall(relDataType, SqlStdOperatorTable.ROW, values)
+ }
+
+ override def toString = s"row(${elements.mkString(", ")})"
+
+ override private[flink] def resultType: TypeInformation[_] = new RowTypeInfo(
+ elements.map(e => e.resultType):_*
+ )
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (elements.isEmpty) {
+ return ValidationFailure("Empty rows are not supported yet.")
+ }
+ ValidationSuccess
+ }
+}
+
case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
override private[flink] def children: Seq[Expression] = elements
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 120bf54..281633e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -250,6 +250,9 @@ object FunctionCatalog {
// map
"map" -> classOf[MapConstructor],
+ // row
+ "row" -> classOf[RowConstructor],
+
// window properties
"start" -> classOf[WindowStart],
"end" -> classOf[WindowEnd],
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
new file mode 100644
index 0000000..abe3ae2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.sql.Date
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.RowTypeTestBase
+import org.junit.Test
+
+class RowTypeTest extends RowTypeTestBase {
+
+ @Test
+ def testRowLiteral(): Unit = {
+
+ // primitive literal
+ testAllApis(
+ row(1, "foo", true),
+ "row(1, 'foo', true)",
+ "ROW(1, 'foo', true)",
+ "1,foo,true")
+
+ // special literal
+ testAllApis(
+ row(Date.valueOf("1985-04-11"),
+ BigDecimal("0.1").bigDecimal,
+ array(1, 2, 3),
+ map("foo", "bar"),
+ row(1, true)
+ ),
+ "row('1985-04-11'.toDate, 0.1p, Array(1, 2, 3), " +
+ "Map('foo', 'bar'), row(1, true))",
+ "ROW(DATE '1985-04-11', CAST(0.1 AS DECIMAL), ARRAY[1, 2, 3], " +
+ "MAP['foo', 'bar'], row(1, true))",
+ "1985-04-11,0.1,[1, 2, 3],{foo=bar},1,true") // string faltten
+
+ testAllApis(
+ row(1 + 1, 2 * 3, Null(Types.STRING)),
+ "row(1 + 1, 2 * 3, Null(STRING))",
+ "ROW(1 + 1, 2 * 3, NULLIF(1,1))",
+ "2,6,null"
+ )
+
+ testSqlApi("(1, 'foo', true)", "1,foo,true")
+ }
+
+ @Test
+ def testRowField(): Unit = {
+ testAllApis(
+ row('f0, 'f1),
+ "row(f0, f1)",
+ "(f0, f1)",
+ "null,1"
+ )
+
+ testAllApis(
+ 'f2,
+ "f2",
+ "f2",
+ "2,foo,true"
+ )
+
+ testAllApis(
+ row('f2, 'f5),
+ "row(f2, f5)",
+ "(f2, f5)",
+ "2,foo,true,foo,null"
+ )
+
+ testAllApis(
+ 'f4,
+ "f4",
+ "f4",
+ "1984-03-12,0E-8,[1, 2, 3]"
+ )
+
+ testAllApis(
+ row('f1, "foo", true),
+ "row(f1, 'foo', true)",
+ "(f1, 'foo',true)",
+ "1,foo,true"
+ )
+ }
+
+ @Test
+ def testRowOperations(): Unit = {
+ testAllApis(
+ 'f5.get("f0"),
+ "f5.get('f0')",
+ "f5.f0",
+ "foo"
+ )
+
+ testAllApis(
+ 'f3.get("f1").get("f2"),
+ "f3.get('f1').get('f2')",
+ "f3.f1.f2",
+ "true"
+ )
+
+ // SQL API for row value constructor follow by field access is not supported
+ testTableApi(
+ row('f1, 'f6, 'f2).get("f1").get("f1"),
+ "row(f1, f6, f2).get('f1').get('f1')",
+ "null"
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 7bdfee0..e6bfcdc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -151,9 +151,9 @@ class SqlExpressionTest extends ExpressionTestBase {
@Test
def testValueConstructorFunctions(): Unit = {
- // TODO we need a special code path that flattens ROW types
- // testSqlApi("ROW('hello world', 12)", "hello world") // test base only returns field 0
- // testSqlApi("('hello world', 12)", "hello world") // test base only returns field 0
+ testSqlApi("ROW('hello world', 12)", "hello world,12")
+ testSqlApi("('hello world', 12)", "hello world,12")
+ testSqlApi("('foo', ('bar', 12))", "foo,bar,12")
testSqlApi("ARRAY[TRUE, FALSE][2]", "false")
testSqlApi("ARRAY[TRUE, TRUE]", "[true, true]")
testSqlApi("MAP['k1', 'v1', 'k2', 'v2']['k2']", "v2")
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala
new file mode 100644
index 0000000..f8f10a6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.utils
+
+import java.sql.Date
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo}
+import org.apache.flink.table.api.Types
+import org.apache.flink.types.Row
+
+class RowTypeTestBase extends ExpressionTestBase {
+
+ override def testData: Any = {
+ val row = new Row(3)
+ row.setField(0, 2)
+ row.setField(1, "foo")
+ row.setField(2, true)
+ val nestedRow = new Row(2)
+ nestedRow.setField(0, 3)
+ nestedRow.setField(1, row)
+ val specialTypeRow = new Row(3)
+ specialTypeRow.setField(0, Date.valueOf("1984-03-12"))
+ specialTypeRow.setField(1, BigDecimal("0.00000000").bigDecimal)
+ specialTypeRow.setField(2, Array(1, 2, 3))
+ val testData = new Row(7)
+ testData.setField(0, null)
+ testData.setField(1, 1)
+ testData.setField(2, row)
+ testData.setField(3, nestedRow)
+ testData.setField(4, specialTypeRow)
+ testData.setField(5, Row.of("foo", null))
+ testData.setField(6, Row.of(null, null))
+ testData
+ }
+
+ override def typeInfo: TypeInformation[Any] = {
+ new RowTypeInfo(
+ Types.STRING,
+ Types.INT,
+ Types.ROW(Types.INT, Types.STRING, Types.BOOLEAN),
+ Types.ROW(Types.INT, Types.ROW(Types.INT, Types.STRING, Types.BOOLEAN)),
+ Types.ROW(Types.SQL_DATE, Types.DECIMAL, ObjectArrayTypeInfo.getInfoFor(Types.INT)),
+ Types.ROW(Types.STRING, Types.BOOLEAN),
+ Types.ROW(Types.STRING, Types.STRING)
+ ).asInstanceOf[TypeInformation[Any]]
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/RowTypeValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/RowTypeValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/RowTypeValidationTest.scala
new file mode 100644
index 0000000..94e8394
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/RowTypeValidationTest.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.validation
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.RowTypeTestBase
+import org.junit.Test
+
+class RowTypeValidationTest extends RowTypeTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testEmptyRowType(): Unit = {
+ testAllApis("FAIL", "row()", "Row()", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testNullRowType(): Unit = {
+ testAllApis("FAIL", "row(null)", "Row(NULL)", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testSqlRowIllegalAccess(): Unit = {
+ testAllApis('f5.get("f2"), "f5.get('f2')", "f5.f2", "FAIL")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 71df4e6..3529b5f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -32,6 +32,7 @@ import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, Ta
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
import org.junit._
+import org.junit.Assert.assertEquals
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -309,6 +310,32 @@ class CalcITCase(
}
@Test
+ def testValueConstructor(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT (a, b, c), ARRAY[12, b], MAP[a, c] FROM MyTable " +
+ "WHERE (a, b, c) = ('foo', 12, TIMESTAMP '1984-07-12 14:34:24')"
+ val rowValue = ("foo", 12, Timestamp.valueOf("1984-07-12 14:34:24"))
+
+ val ds = env.fromElements(rowValue)
+ tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+ val result = tEnv.sqlQuery(sqlQuery)
+
+ val expected = "foo,12,1984-07-12 14:34:24.0,[12, 12],{foo=1984-07-12 14:34:24.0}"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+
+ // Compare actual object to avoid undetected Calcite flattening
+ val resultRow = results.asJava.get(0)
+ assertEquals(rowValue._1, resultRow.getField(0).asInstanceOf[Row].getField(0))
+ assertEquals(rowValue._2, resultRow.getField(1).asInstanceOf[Array[Integer]](1))
+ assertEquals(rowValue._3,
+ resultRow.getField(2).asInstanceOf[util.Map[String, Timestamp]].get(rowValue._1))
+ }
+
+ @Test
def testUserDefinedScalarFunction(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 22373d2..5ef7e31 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -36,6 +36,7 @@ import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
import org.apache.flink.types.Row
import org.junit._
+import org.junit.Assert.assertEquals
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -471,6 +472,29 @@ class CalcITCase(
}
@Test
+ def testValueConstructor(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val rowValue = ("foo", 12, Timestamp.valueOf("1984-07-12 14:34:24"))
+
+ val table = env.fromElements(rowValue).toTable(tEnv, 'a, 'b, 'c)
+
+ val result = table.select(row('a, 'b, 'c), array(12, 'b), map('a, 'c))
+
+ val expected = "foo,12,1984-07-12 14:34:24.0,[12, 12],{foo=1984-07-12 14:34:24.0}"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+
+ // Compare actual object to avoid undetected Calcite flattening
+ val resultRow = results.asJava.get(0)
+ assertEquals(rowValue._1, resultRow.getField(0).asInstanceOf[Row].getField(0))
+ assertEquals(rowValue._2, resultRow.getField(1).asInstanceOf[Array[Integer]](1))
+ assertEquals(rowValue._3,
+ resultRow.getField(2).asInstanceOf[util.Map[String, Timestamp]].get(rowValue._1))
+ }
+
+ @Test
def testMultipleUserDefinedScalarFunctions(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)