You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/11/22 08:19:21 UTC
flink git commit: [FLINK-4294] [table] Allow access of composite type
fields
Repository: flink
Updated Branches:
refs/heads/master fdb134cab -> e45096867
[FLINK-4294] [table] Allow access of composite type fields
This closes #2319.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4509686
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4509686
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4509686
Branch: refs/heads/master
Commit: e45096867a9542a756443310a535a971a6a29695
Parents: fdb134c
Author: twalthr <tw...@apache.org>
Authored: Mon Aug 1 14:15:49 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Nov 22 09:15:09 2016 +0100
----------------------------------------------------------------------
docs/dev/table_api.md | 84 +++++++-
.../flink/api/scala/table/expressionDsl.scala | 23 +++
.../flink/api/table/FlinkPlannerImpl.scala | 4 +-
.../flink/api/table/FlinkTypeFactory.scala | 29 ++-
.../flink/api/table/codegen/CodeGenerator.scala | 39 +++-
.../table/expressions/ExpressionParser.scala | 23 ++-
.../flink/api/table/expressions/composite.scala | 106 ++++++++++
.../api/table/expressions/fieldExpression.scala | 6 +-
.../api/table/plan/ProjectionTranslator.scala | 29 ++-
.../api/table/plan/logical/LogicalNode.scala | 2 +-
.../api/table/plan/logical/operators.scala | 1 +
.../flink/api/table/plan/nodes/FlinkRel.scala | 25 ++-
.../plan/schema/CompositeRelDataType.scala | 83 ++++++++
.../org/apache/flink/api/table/table.scala | 12 +-
.../flink/api/java/batch/sql/SqlITCase.java | 2 +-
.../scala/batch/sql/TableWithSQLITCase.scala | 18 ++
.../api/table/CompositeFlatteningTest.scala | 146 ++++++++++++++
.../table/expressions/CompositeAccessTest.scala | 192 +++++++++++++++++++
.../table/expressions/SqlExpressionTest.scala | 3 +-
.../expressions/utils/ExpressionTestBase.scala | 1 -
20 files changed, 795 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index d017337..bb0e500 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1470,7 +1470,7 @@ The Table API is built on top of Flink's DataSet and DataStream API. Internally,
| `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH` | `java.lang.Integer` |
| `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` |
-Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and arrays can be fields of a row but can not be accessed yet. They are treated like a black box within Table API and SQL.
+Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and arrays can be fields of a row. Generic types and arrays are treated as a black box within Table API and SQL yet. Composite types, however, are fully supported types where fields of a composite type can be accessed using the `.get()` operator in Table API and dot operator (e.g. `MyTable.pojoColumn.myField`) in SQL. Composite types can also be flattened using `.flatten()` in Table API or `MyTable.pojoColumn.*` in SQL.
{% top %}
@@ -2004,6 +2004,29 @@ NUMERIC.rows
</td>
</tr>
+ <tr>
+ <td>
+ {% highlight java %}
+ANY.flatten()
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes into a flat representation where every subtype is a separate field. In most cases the fields of the flat representation are named similarly to the original fields but with a dollar separator (e.g. <code>mypojo$mytuple$f0</code>).</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+COMPOSITE.get(STRING)
+COMPOSITE.get(INT)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index or name and returns it's value. E.g. <code>pojo.get('myField')</code> or <code>tuple.get(0)</code>.</p>
+ </td>
+ </tr>
+
</tbody>
</table>
@@ -2531,6 +2554,29 @@ NUMERIC.rows
</td>
</tr>
+ <tr>
+ <td>
+ {% highlight scala %}
+ANY.flatten()
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes into a flat representation where every subtype is a separate field. In most cases the fields of the flat representation are named similarly to the original fields but with a dollar separator (e.g. <code>mypojo$mytuple$f0</code>).</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
+COMPOSITE.get(STRING)
+COMPOSITE.get(INT)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index or name and returns it's value. E.g. <code>'pojo.get("myField")</code> or <code>'tuple.get(0)</code>.</p>
+ </td>
+ </tr>
+
</tbody>
</table>
</div>
@@ -3298,6 +3344,8 @@ CAST(value AS type)
</tbody>
</table>
+
+<!-- Disabled temporarily in favor of composite type support
<table class="table table-bordered">
<thead>
<tr>
@@ -3330,6 +3378,7 @@ ROW (value [, value]* )
</tr>
</tbody>
</table>
+-->
<table class="table table-bordered">
<thead>
@@ -3551,6 +3600,39 @@ MIN(value)
</tbody>
</table>
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 40%">Value access functions</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td>
+ {% highlight text %}
+tableName.compositeType.field
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and returns it's value.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight text %}
+tableName.compositeType.*
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes into a flat representation where every subtype is a separate field.</p>
+ </td>
+ </tr>
+ </tbody>
+</table>
+
</div>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 836db3e..fee43d8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -431,6 +431,29 @@ trait ImplicitExpressionOperations {
*/
def rows = toRowInterval(expr)
+ /**
+ * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
+ * returns it's value.
+ *
+ * @param name name of the field (similar to Flink's field expressions)
+ * @return value of the field
+ */
+ def get(name: String) = GetCompositeField(expr, name)
+
+ /**
+ * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
+ * returns it's value.
+ *
+ * @param index position of the field
+ * @return value of the field
+ */
+ def get(index: Int) = GetCompositeField(expr, index)
+
+ /**
+ * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
+ * into a flat representation where every subtype is a separate field.
+ */
+ def flatten() = Flattening(expr)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
index 97e5cf2..131cdc6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
@@ -103,7 +103,9 @@ class FlinkPlannerImpl(
val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config)
root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
- root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
+ // we disable automatic flattening in order to let composite types pass without modification
+ // we might enable it again once Calcite has better support for structured types
+ // root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
root
} catch {
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
index 1f607e4..ee71ce9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
@@ -22,14 +22,15 @@ import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName, SqlTypeUtil}
+import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName
-import org.apache.flink.api.table.plan.schema.GenericRelDataType
+import org.apache.flink.api.table.plan.schema.{CompositeRelDataType, GenericRelDataType}
import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple
@@ -41,6 +42,9 @@ import scala.collection.mutable
*/
class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) {
+ // NOTE: for future data types it might be necessary to
+ // override more methods of RelDataTypeFactoryImpl
+
private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
@@ -79,14 +83,27 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
- // TODO add specific RelDataTypes
- // for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType
+ case ct: CompositeType[_] =>
+ new CompositeRelDataType(ct, this)
+
+ // TODO add specific RelDataTypes for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo
case ti: TypeInformation[_] =>
new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
case ti@_ =>
throw TableException(s"Unsupported type information: $ti")
}
+
+ override def createTypeWithNullability(
+ relDataType: RelDataType,
+ nullable: Boolean)
+ : RelDataType = relDataType match {
+ case composite: CompositeRelDataType =>
+ // at the moment we do not care about nullability
+ composite
+ case _ =>
+ super.createTypeWithNullability(relDataType, nullable)
+ }
}
object FlinkTypeFactory {
@@ -147,6 +164,10 @@ object FlinkTypeFactory {
val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
genericRelDataType.typeInfo
+ case ROW if relDataType.isInstanceOf[CompositeRelDataType] =>
+ val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType]
+ compositeRelDataType.compositeType
+
case _@t =>
throw TableException(s"Type is not supported: $t")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index bbcd70f..d40e0e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -608,9 +608,40 @@ class CodeGenerator(
generateInputAccess(input._1, input._2, index)
}
- override def visitFieldAccess(rexFieldAccess: RexFieldAccess): GeneratedExpression =
- throw new CodeGenException("Accesses to fields are not supported yet.")
+ override def visitFieldAccess(rexFieldAccess: RexFieldAccess): GeneratedExpression = {
+ val refExpr = rexFieldAccess.getReferenceExpr.accept(this)
+ val index = rexFieldAccess.getField.getIndex
+ val fieldAccessExpr = generateFieldAccess(refExpr.resultType, refExpr.resultTerm, index)
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldAccessExpr.resultType)
+ val defaultValue = primitiveDefaultValue(fieldAccessExpr.resultType)
+ val resultCode = if (nullCheck) {
+ s"""
+ |${refExpr.code}
+ |$resultTypeTerm $resultTerm;
+ |boolean $nullTerm;
+ |if (${refExpr.nullTerm}) {
+ | $resultTerm = $defaultValue;
+ | $nullTerm = true;
+ |}
+ |else {
+ | ${fieldAccessExpr.code}
+ | $resultTerm = ${fieldAccessExpr.resultTerm};
+ | $nullTerm = ${fieldAccessExpr.nullTerm};
+ |}
+ |""".stripMargin
+ } else {
+ s"""
+ |${refExpr.code}
+ |${fieldAccessExpr.code}
+ |$resultTypeTerm $resultTerm = ${fieldAccessExpr.resultTerm};
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, resultCode, fieldAccessExpr.resultType)
+ }
override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
val resultType = FlinkTypeFactory.toTypeInfo(literal.getType)
@@ -1014,13 +1045,13 @@ class CodeGenerator(
}
private def generateFieldAccess(
- inputType: TypeInformation[Any],
+ inputType: TypeInformation[_],
inputTerm: String,
index: Int)
: GeneratedExpression = {
inputType match {
case ct: CompositeType[_] =>
- val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]]) {
+ val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]] && inputPojoFieldMapping.nonEmpty) {
inputPojoFieldMapping.get(index)
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index a438c1c..6b6c129 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -84,6 +84,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val MILLI: Keyword = Keyword("milli")
lazy val ROWS: Keyword = Keyword("rows")
lazy val STAR: Keyword = Keyword("*")
+ lazy val GET: Keyword = Keyword("get")
+ lazy val FLATTEN: Keyword = Keyword("flatten")
def functionIdent: ExpressionParser.Parser[String] =
not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
@@ -277,11 +279,21 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val suffixRowInterval : PackratParser[Expression] =
composite <~ "." ~ ROWS ^^ { e => ExpressionUtils.toRowInterval(e) }
+ lazy val suffixGet: PackratParser[Expression] =
+ composite ~ "." ~ GET ~ "(" ~ literalExpr ~ ")" ^^ {
+ case e ~ _ ~ _ ~ _ ~ index ~ _ =>
+ GetCompositeField(e, index.asInstanceOf[Literal].value)
+ }
+
+ lazy val suffixFlattening: PackratParser[Expression] =
+ composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e => Flattening(e) }
+
lazy val suffixed: PackratParser[Expression] =
suffixTimeInterval | suffixRowInterval | suffixSum | suffixMin | suffixMax | suffixStart |
suffixEnd | suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim |
suffixTrimWithoutArgs | suffixIf | suffixAsc | suffixDesc | suffixToDate |
suffixToTimestamp | suffixToTime | suffixExtract | suffixFloor | suffixCeil |
+ suffixGet | suffixFlattening |
suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end
// prefix operators
@@ -350,10 +362,19 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
}
+ lazy val prefixGet: PackratParser[Expression] =
+ GET ~ "(" ~ composite ~ "," ~ literalExpr ~ ")" ^^ {
+ case _ ~ _ ~ e ~ _ ~ index ~ _ =>
+ GetCompositeField(e, index.asInstanceOf[Literal].value)
+ }
+
+ lazy val prefixFlattening: PackratParser[Expression] =
+ FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
+
lazy val prefixed: PackratParser[Expression] =
prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | prefixStart | prefixEnd |
prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
- prefixFloor | prefixCeil |
+ prefixFloor | prefixCeil | prefixGet | prefixFlattening |
prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end
// suffix/prefix composite
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
new file mode 100644
index 0000000..ee1eb46
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.UnresolvedException
+import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+/**
+ * Flattening of composite types. All flattenings are resolved into
+ * `GetCompositeField` expressions.
+ */
+case class Flattening(child: Expression) extends UnaryExpression {
+
+ override def toString = s"$child.flatten()"
+
+ override private[flink] def resultType: TypeInformation[_] =
+ throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
+
+ override private[flink] def validateInput(): ValidationResult =
+ ValidationFailure(s"Unresolved flattening of $child")
+}
+
+case class GetCompositeField(child: Expression, key: Any) extends UnaryExpression {
+
+ private var fieldIndex: Option[Int] = None
+
+ override def toString = s"$child.get($key)"
+
+ override private[flink] def validateInput(): ValidationResult = {
+ // check for composite type
+ if (!child.resultType.isInstanceOf[CompositeType[_]]) {
+ return ValidationFailure(s"Cannot access field of non-composite type '${child.resultType}'.")
+ }
+ val compositeType = child.resultType.asInstanceOf[CompositeType[_]]
+
+ // check key
+ key match {
+ case name: String =>
+ val index = compositeType.getFieldIndex(name)
+ if (index < 0) {
+ ValidationFailure(s"Field name '$name' could not be found.")
+ } else {
+ fieldIndex = Some(index)
+ ValidationSuccess
+ }
+ case index: Int =>
+ if (index >= compositeType.getArity) {
+ ValidationFailure(s"Field index '$index' exceeds arity.")
+ } else {
+ fieldIndex = Some(index)
+ ValidationSuccess
+ }
+ case _ =>
+ ValidationFailure(s"Invalid key '$key'.")
+ }
+ }
+
+ override private[flink] def resultType: TypeInformation[_] =
+ child.resultType.asInstanceOf[CompositeType[_]].getTypeAt(fieldIndex.get)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeFieldAccess(child.toRexNode, fieldIndex.get)
+ }
+
+ override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+ val child: Expression = anyRefs.head.asInstanceOf[Expression]
+ copy(child, key).asInstanceOf[this.type]
+ }
+
+ /**
+ * Gives a meaningful alias if possible (e.g. a$mypojo$field).
+ */
+ private[flink] def aliasName(): Option[String] = child match {
+ case gcf: GetCompositeField =>
+ val alias = gcf.aliasName()
+ if (alias.isDefined) {
+ Some(s"${alias.get}$$$key")
+ } else {
+ None
+ }
+ case c: ResolvedFieldReference => Some(s"${c.name}$$$key")
+ case _ => None
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
index 91efd08..c7817bf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
@@ -103,13 +103,13 @@ case class Alias(child: Expression, name: String)
case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression {
override private[flink] def name: String =
- throw new UnresolvedException("Invalid call to name on UnresolvedAlias")
+ throw UnresolvedException("Invalid call to name on UnresolvedAlias")
override private[flink] def toAttribute: Attribute =
- throw new UnresolvedException("Invalid call to toAttribute on UnresolvedAlias")
+ throw UnresolvedException("Invalid call to toAttribute on UnresolvedAlias")
override private[flink] def resultType: TypeInformation[_] =
- throw new UnresolvedException("Invalid call to resultType on UnresolvedAlias")
+ throw UnresolvedException("Invalid call to resultType on UnresolvedAlias")
override private[flink] lazy val valid = false
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
index d09b03e..cd22f6a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
@@ -18,9 +18,10 @@
package org.apache.flink.api.table.plan
+import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.plan.logical.LogicalNode
+import org.apache.flink.api.table.plan.logical.{LogicalNode, Project}
import scala.collection.mutable.ListBuffer
@@ -159,11 +160,35 @@ object ProjectionTranslator {
/**
* Expands an UnresolvedFieldReference("*") to parent's full project list.
*/
- def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[Expression] = {
+ def expandProjectList(
+ exprs: Seq[Expression],
+ parent: LogicalNode,
+ tableEnv: TableEnvironment)
+ : Seq[Expression] = {
+
val projectList = new ListBuffer[Expression]
+
exprs.foreach {
case n: UnresolvedFieldReference if n.name == "*" =>
projectList ++= parent.output.map(a => UnresolvedFieldReference(a.name))
+
+ case Flattening(unresolved) =>
+ // simulate a simple project to resolve fields using current parent
+ val project = Project(Seq(UnresolvedAlias(unresolved)), parent).validate(tableEnv)
+ val resolvedExpr = project
+ .output
+ .headOption
+ .getOrElse(throw new RuntimeException("Could not find resolved composite."))
+ resolvedExpr.validateInput()
+ val newProjects = resolvedExpr.resultType match {
+ case ct: CompositeType[_] =>
+ (0 until ct.getArity).map { idx =>
+ projectList += GetCompositeField(unresolved, ct.getFieldNames()(idx))
+ }
+ case _ =>
+ projectList += unresolved
+ }
+
case e: Expression => projectList += e
}
projectList
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
index 55fba07..21290d4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.plan.logical
import org.apache.calcite.rel.RelNode
import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment, ValidationException}
+import org.apache.flink.api.table.{TableEnvironment, ValidationException}
import org.apache.flink.api.table.expressions._
import org.apache.flink.api.table.trees.TreeNode
import org.apache.flink.api.table.typeutils.TypeCoercion
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 1d7ed5f..ecf1996 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -45,6 +45,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
case ne: NamedExpression => ne
case expr if !expr.valid => u
case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")
+ case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
case other => Alias(other, s"_c$i")
}
case _ =>
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
index a4c7589..7932e11 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
@@ -35,19 +35,32 @@ trait FlinkRel {
localExprsTable: Option[List[RexNode]]): String = {
expr match {
- case i: RexInputRef => inFields.get(i.getIndex)
- case l: RexLiteral => l.toString
+ case i: RexInputRef =>
+ inFields.get(i.getIndex)
+
+ case l: RexLiteral =>
+ l.toString
+
case l: RexLocalRef if localExprsTable.isEmpty =>
- throw new IllegalArgumentException("Encountered RexLocalRef without local expression table")
+ throw new IllegalArgumentException("Encountered RexLocalRef without " +
+ "local expression table")
+
case l: RexLocalRef =>
val lExpr = localExprsTable.get(l.getIndex)
getExpressionString(lExpr, inFields, localExprsTable)
- case c: RexCall => {
+
+ case c: RexCall =>
val op = c.getOperator.toString
val ops = c.getOperands.map(getExpressionString(_, inFields, localExprsTable))
s"$op(${ops.mkString(", ")})"
- }
- case _ => throw new IllegalArgumentException("Unknown expression type: " + expr)
+
+ case fa: RexFieldAccess =>
+ val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, localExprsTable)
+ val field = fa.getField.getName
+ s"$referenceExpr.$field"
+
+ case _ =>
+ throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
new file mode 100644
index 0000000..b9ceff0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFieldImpl, RelRecordType}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.FlinkTypeFactory
+import org.apache.flink.api.table.plan.schema.CompositeRelDataType.createFieldList
+
+import scala.collection.JavaConverters._
+
+/**
+ * Composite type for encapsulating Flink's [[CompositeType]].
+ *
+ * @param compositeType CompositeType to encapsulate
+ * @param typeFactory Flink's type factory
+ */
+class CompositeRelDataType(
+ val compositeType: CompositeType[_],
+ typeFactory: FlinkTypeFactory)
+ extends RelRecordType(createFieldList(compositeType, typeFactory)) {
+
+ override def toString = s"COMPOSITE($compositeType)"
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[CompositeRelDataType]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: CompositeRelDataType =>
+ super.equals(that) &&
+ (that canEqual this) &&
+ compositeType == that.compositeType
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ compositeType.hashCode()
+ }
+
+}
+
+object CompositeRelDataType {
+
+ /**
+ * Converts the fields of a composite type to list of [[RelDataTypeField]].
+ */
+ private def createFieldList(
+ compositeType: CompositeType[_],
+ typeFactory: FlinkTypeFactory)
+ : util.List[RelDataTypeField] = {
+
+ compositeType
+ .getFieldNames
+ .zipWithIndex
+ .map { case (name, index) =>
+ new RelDataTypeFieldImpl(
+ name,
+ index,
+ typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
+ .asInstanceOf[RelDataTypeField]
+ }
+ .toList
+ .asJava
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 8528c8a..c45e871 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -55,7 +55,7 @@ import scala.collection.JavaConverters._
* syntax.
*
* @param tableEnv The [[TableEnvironment]] to which the table is bound.
- * @param logicalPlan
+ * @param logicalPlan logical representation
*/
class Table(
private[flink] val tableEnv: TableEnvironment,
@@ -77,7 +77,7 @@ class Table(
*/
def select(fields: Expression*): Table = {
- val expandedFields = expandProjectList(fields, logicalPlan)
+ val expandedFields = expandProjectList(fields, logicalPlan, tableEnv)
val (projection, aggs, props) = extractAggregationsAndProperties(expandedFields, tableEnv)
if (props.nonEmpty) {
@@ -549,11 +549,9 @@ class Table(
* }}}
*/
def orderBy(fields: Expression*): Table = {
- val order: Seq[Ordering] = fields.map { case e =>
- e match {
- case o: Ordering => o
- case _ => Asc(e)
- }
+ val order: Seq[Ordering] = fields.map {
+ case o: Ordering => o
+ case e => Asc(e)
}
new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
index 5f50517..1364cbd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
@@ -52,7 +52,7 @@ public class SqlITCase extends TableProgramsTestBase {
Table result = tableEnv.sql(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
- resultSet.print();
+
List<Row> results = resultSet.collect();
String expected = "3,World,false,1944-12-24,12.5444444500000000\n" +
"2,Hello,true,1944-02-24,12.6666666650000000\n" +
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
index 30e44f0..a770a6e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
@@ -96,4 +96,22 @@ class TableWithSQLITCase(
val results = result2.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+
+ @Test
+ def testSelectWithCompositeType(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "SELECT MyTable.a2, MyTable.a1._2 FROM MyTable"
+
+ val ds = env.fromElements(((12, true), "Hello")).toTable(tEnv).as('a1, 'a2)
+ tEnv.registerTable("MyTable", ds)
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = "Hello,true\n"
+
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
new file mode 100644
index 0000000..f14b9d8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.CompositeFlatteningTest.{TestCaseClass, giveMeCaseClass}
+import org.apache.flink.api.table.functions.ScalarFunction
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+
+class CompositeFlatteningTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testDuplicateFlattening(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+ table.select('a.flatten(), 'a.flatten())
+ }
+
+ @Test
+ def testMultipleFlatteningsTable(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table.select('a.flatten(), 'c, 'b.flatten())
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "a._1 AS a$_1",
+ "a._2 AS a$_2",
+ "c",
+ "b._1 AS b$_1",
+ "b._2 AS b$_2"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testMultipleFlatteningsSql(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "a._1 AS _1",
+ "a._2 AS _2",
+ "c",
+ "b._1 AS _10",
+ "b._2 AS _20"
+ )
+ )
+
+ util.verifySql(
+ "SELECT MyTable.a.*, c, MyTable.b.* FROM MyTable",
+ expected)
+ }
+
+ @Test
+ def testNestedFlattenings(): Unit = {
+ val util = batchTestUtil()
+ val table = util
+ .addTable[((((String, TestCaseClass), Boolean), String), String)]("MyTable", 'a, 'b)
+
+ val result = table.select('a.flatten(), 'b.flatten())
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "a._1 AS a$_1",
+ "a._2 AS a$_2",
+ "b"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testScalarFunctionAccess(): Unit = {
+ val util = batchTestUtil()
+ val table = util
+ .addTable[(String, Int)]("MyTable", 'a, 'b)
+
+ val result = table.select(
+ giveMeCaseClass().get("my"),
+ giveMeCaseClass().get("clazz"),
+ giveMeCaseClass().flatten())
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select",
+ "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c0",
+ "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c1",
+ "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c2",
+ "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c3"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+}
+
+object CompositeFlatteningTest {
+
+ case class TestCaseClass(my: String, clazz: Int)
+
+ object giveMeCaseClass extends ScalarFunction {
+ def eval(): TestCaseClass = {
+ TestCaseClass("hello", 42)
+ }
+
+ override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+ createTypeInformation[TestCaseClass]
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala
new file mode 100644
index 0000000..3121c58
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, Types, ValidationException}
+import org.apache.flink.api.table.expressions.CompositeAccessTest.{MyCaseClass, MyCaseClass2, MyPojo}
+import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.junit.Test
+
+
+class CompositeAccessTest extends ExpressionTestBase {
+
+ @Test
+ def testGetField(): Unit = {
+
+ // single field by string key
+ testAllApis(
+ 'f0.get("intField"),
+ "f0.get('intField')",
+ "testTable.f0.intField",
+ "42")
+
+ testSqlApi("testTable.f0.stringField", "Bob")
+
+ testSqlApi("testTable.f0.booleanField", "true")
+
+ // single field by int key
+ testTableApi(
+ 'f0.get(0),
+ "f0.get(0)",
+ "42")
+
+ // nested single field
+ testAllApis(
+ 'f1.get("objectField").get("intField"),
+ "f1.get('objectField').get('intField')",
+ "testTable.f1.objectField.intField",
+ "25")
+
+ testSqlApi("testTable.f1.objectField.stringField", "Timo")
+
+ testSqlApi("testTable.f1.objectField.booleanField", "false")
+
+ testAllApis(
+ 'f2.get(0),
+ "f2.get(0)",
+ "testTable.f2._1",
+ "a")
+
+ testSqlApi("testTable.f3.f1", "b")
+
+ testSqlApi("testTable.f4.myString", "Hello")
+
+ testSqlApi("testTable.f5", "13")
+
+ testAllApis(
+ 'f7.get("_1"),
+ "get(f7, '_1')",
+ "testTable.f7._1",
+ "true")
+
+ // composite field return type
+ testSqlApi("testTable.f6", "MyCaseClass2(null)")
+
+ testAllApis(
+ 'f1.get("objectField"),
+ "f1.get('objectField')",
+ "testTable.f1.objectField",
+ "MyCaseClass(25,Timo,false)")
+
+ testAllApis(
+ 'f0,
+ "f0",
+ "testTable.f0",
+ "MyCaseClass(42,Bob,true)")
+
+ // flattening (test base only returns first column)
+ testAllApis(
+ 'f1.get("objectField").flatten(),
+ "f1.get('objectField').flatten()",
+ "testTable.f1.objectField.*",
+ "25")
+
+ testAllApis(
+ 'f0.flatten(),
+ "flatten(f0)",
+ "testTable.f0.*",
+ "42")
+
+ testTableApi(12.flatten(), "12.flatten()", "12")
+
+ testTableApi('f5.flatten(), "f5.flatten()", "13")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWrongSqlField(): Unit = {
+ testSqlApi("testTable.f5.test", "13")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWrongIntKeyField(): Unit = {
+ testTableApi('f0.get(555), "'fail'", "fail")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWrongIntKeyField2(): Unit = {
+ testTableApi("fail", "f0.get(555)", "fail")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWrongStringKeyField(): Unit = {
+ testTableApi('f0.get("fghj"), "'fail'", "fail")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWrongStringKeyField2(): Unit = {
+ testTableApi("fail", "f0.get('fghj')", "fail")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ def testData = {
+ val testData = new Row(8)
+ testData.setField(0, MyCaseClass(42, "Bob", booleanField = true))
+ testData.setField(1, MyCaseClass2(MyCaseClass(25, "Timo", booleanField = false)))
+ testData.setField(2, ("a", "b"))
+ testData.setField(3, new org.apache.flink.api.java.tuple.Tuple2[String, String]("a", "b"))
+ testData.setField(4, new MyPojo())
+ testData.setField(5, 13)
+ testData.setField(6, MyCaseClass2(null))
+ testData.setField(7, Tuple1(true))
+ testData
+ }
+
+ def typeInfo = {
+ new RowTypeInfo(Seq(
+ createTypeInformation[MyCaseClass],
+ createTypeInformation[MyCaseClass2],
+ createTypeInformation[(String, String)],
+ new TupleTypeInfo(Types.STRING, Types.STRING),
+ TypeExtractor.createTypeInfo(classOf[MyPojo]),
+ Types.INT,
+ createTypeInformation[MyCaseClass2],
+ createTypeInformation[Tuple1[Boolean]]
+ )).asInstanceOf[TypeInformation[Any]]
+ }
+
+}
+
+object CompositeAccessTest {
+ case class MyCaseClass(intField: Int, stringField: String, booleanField: Boolean)
+
+ case class MyCaseClass2(objectField: MyCaseClass)
+
+ class MyPojo {
+ private var myInt: Int = 0
+ private var myString: String = "Hello"
+
+ def getMyInt = myInt
+
+ def setMyInt(value: Int) = {
+ myInt = value
+ }
+
+ def getMyString = myString
+
+ def setMyString(value: String) = {
+ myString = myString
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
index cae4388..b892cfb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.junit.Test
+import org.junit.{Ignore, Test}
/**
* Tests all SQL expressions that are currently supported according to the documentation.
@@ -135,6 +135,7 @@ class SqlExpressionTest extends ExpressionTestBase {
testSqlApi("CAST(2 AS DOUBLE)", "2.0")
}
+ @Ignore // TODO we need a special code path that flattens ROW types
@Test
def testValueConstructorFunctions(): Unit = {
testSqlApi("ROW('hello world', 12)", "hello world") // test base only returns field 0
http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
index 6720759..d34e335 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
@@ -18,7 +18,6 @@
package org.apache.flink.api.table.expressions.utils
-import org.apache.calcite.rel.logical.LogicalProject
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql2rel.RelDecorrelator