You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/05 17:39:16 UTC
flink git commit: [FLINK-8203] [FLINK-7681] [table] Make schema
definition of DataStream/DataSet to Table conversion more flexible
Repository: flink
Updated Branches:
refs/heads/master f88da4d04 -> fb29898cd
[FLINK-8203] [FLINK-7681] [table] Make schema definition of DataStream/DataSet to Table conversion more flexible
This closes #5132.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb29898c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb29898c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb29898c
Branch: refs/heads/master
Commit: fb29898cd3507b2b94dd8bbf3dbfd2132b643a1d
Parents: f88da4d
Author: twalthr <tw...@apache.org>
Authored: Thu Dec 7 11:52:28 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Fri Jan 5 18:32:37 2018 +0100
----------------------------------------------------------------------
.../flink/table/api/BatchTableEnvironment.scala | 4 +-
.../table/api/StreamTableEnvironment.scala | 96 ++-
.../flink/table/api/TableEnvironment.scala | 163 +++--
.../flink/table/codegen/CodeGenerator.scala | 19 +-
.../plan/schema/FlinkTableFunctionImpl.scala | 16 +-
.../flink/table/plan/schema/InlineTable.scala | 16 +-
.../table/runtime/aggregate/SortUtil.scala | 3 +-
.../flink/table/sources/TableSourceUtil.scala | 15 +-
.../batch/table/JavaTableEnvironmentITCase.java | 74 +-
.../flink/table/api/TableEnvironmentTest.scala | 687 ++++++++++++++-----
.../StreamTableEnvironmentValidationTest.scala | 102 ++-
.../TableEnvironmentValidationTest.scala | 84 +--
.../batch/table/TableEnvironmentITCase.scala | 22 +-
.../flink/table/utils/TableTestBase.scala | 8 +-
14 files changed, 923 insertions(+), 386 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index c25007b..c920d23 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -290,8 +290,10 @@ abstract class BatchTableEnvironment(
protected def registerDataSetInternal[T](
name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
+ val inputType = dataSet.getType
+
val (fieldNames, fieldIndexes) = getFieldInfo[T](
- dataSet.getType,
+ inputType,
fields)
if (fields.exists(_.isInstanceOf[TimeAttribute])) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 920da2e..9d94f54 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -28,10 +28,10 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
@@ -42,9 +42,9 @@ import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
import org.apache.flink.table.plan.rules.FlinkRuleSets
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable, TableSinkTable}
import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.runtime.conversion._
-import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable, TableSinkTable}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFunction}
import org.apache.flink.table.sinks._
@@ -450,37 +450,57 @@ abstract class StreamTableEnvironment(
exprs: Array[Expression])
: (Option[(Int, String)], Option[(Int, String)]) = {
- val fieldTypes: Array[TypeInformation[_]] = streamType match {
- case c: CompositeType[_] => (0 until c.getArity).map(i => c.getTypeAt(i)).toArray
- case a: AtomicType[_] => Array(a)
+ val (isRefByPos, fieldTypes) = streamType match {
+ case c: CompositeType[_] =>
+ // determine schema definition mode (by position or by name)
+ (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => c.getTypeAt(i)).toArray)
+ case t: TypeInformation[_] =>
+ (false, Array(t))
}
var fieldNames: List[String] = Nil
var rowtime: Option[(Int, String)] = None
var proctime: Option[(Int, String)] = None
+ def checkRowtimeType(t: TypeInformation[_]): Unit = {
+ if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+ throw new TableException(
+ s"The rowtime attribute can only replace a field with a valid time type, " +
+ s"such as Timestamp or Long. But was: $t")
+ }
+ }
+
def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = {
if (rowtime.isDefined) {
throw new TableException(
"The rowtime attribute can only be defined once in a table schema.")
} else {
- val mappedIdx = streamType match {
- case pti: PojoTypeInfo[_] =>
- pti.getFieldIndex(origName.getOrElse(name))
- case _ => idx;
+ // if the fields are referenced by position,
+ // it is possible to replace an existing field or append the time attribute at the end
+ if (isRefByPos) {
+ // aliases are not permitted
+ if (origName.isDefined) {
+ throw new TableException(
+ s"Invalid alias '${origName.get}' because fields are referenced by position.")
+ }
+ // check type of field that is replaced
+ if (idx < fieldTypes.length) {
+ checkRowtimeType(fieldTypes(idx))
+ }
}
- // check type of field that is replaced
- if (mappedIdx < 0) {
- throw new TableException(
- s"The rowtime attribute can only replace a valid field. " +
- s"${origName.getOrElse(name)} is not a field of type $streamType.")
- }
- else if (mappedIdx < fieldTypes.length &&
- !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
- TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) {
- throw new TableException(
- s"The rowtime attribute can only replace a field with a valid time type, " +
- s"such as Timestamp or Long. But was: ${fieldTypes(mappedIdx)}")
+ // check reference-by-name
+ else {
+ val aliasOrName = origName.getOrElse(name)
+ streamType match {
+ // both alias and reference must have a valid type if they replace a field
+ case ct: CompositeType[_] if ct.hasField(aliasOrName) =>
+ val t = ct.getTypeAt(ct.getFieldIndex(aliasOrName))
+ checkRowtimeType(t)
+ // alias could not be found
+ case _ if origName.isDefined =>
+ throw new TableException(s"Alias '${origName.get}' must reference an existing field.")
+ case _ => // ok
+ }
}
rowtime = Some(idx, name)
@@ -492,11 +512,26 @@ abstract class StreamTableEnvironment(
throw new TableException(
"The proctime attribute can only be defined once in a table schema.")
} else {
- // check that proctime is only appended
- if (idx < fieldTypes.length) {
- throw new TableException(
- "The proctime attribute can only be appended to the table schema and not replace " +
- "an existing field. Please move it to the end of the schema.")
+ // if the fields are referenced by position,
+ // it is only possible to append the time attribute at the end
+ if (isRefByPos) {
+
+ // check that proctime is only appended
+ if (idx < fieldTypes.length) {
+ throw new TableException(
+ "The proctime attribute can only be appended to the table schema and not replace " +
+ s"an existing field. Please move '$name' to the end of the schema.")
+ }
+ }
+ // check reference-by-name
+ else {
+ streamType match {
+ // proctime attribute must not replace a field
+ case ct: CompositeType[_] if ct.hasField(name) =>
+ throw new TableException(
+ s"The proctime attribute '$name' must not replace an existing field.")
+ case _ => // ok
+ }
}
proctime = Some(idx, name)
}
@@ -512,16 +547,15 @@ abstract class StreamTableEnvironment(
case (ProctimeAttribute(UnresolvedFieldReference(name)), idx) =>
extractProctime(idx, name)
- case (ProctimeAttribute(Alias(UnresolvedFieldReference(_), name, _)), idx) =>
- extractProctime(idx, name)
-
case (UnresolvedFieldReference(name), _) => fieldNames = name :: fieldNames
case (Alias(UnresolvedFieldReference(_), name, _), _) => fieldNames = name :: fieldNames
case (e, _) =>
throw new TableException(s"Time attributes can only be defined on field references or " +
- s"aliases of field references. But was: $e")
+ s"aliases of valid field references. Rowtime attributes can replace existing fields, " +
+ s"proctime attributes can not. " +
+ s"But was: $e")
}
if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index c3cab13..6170fa1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -36,7 +36,7 @@ import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.util.ChainedSqlOperatorTable
import org.apache.calcite.tools._
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
@@ -769,6 +769,38 @@ abstract class TableEnvironment(val config: TableConfig) {
}
/**
+ * Reference input fields by name:
+ * All fields in the schema definition are referenced by name
+ * (and possibly renamed using an alias (as). In this mode, fields can be reordered and
+ * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
+ * positions using arbitrary names (except those that exist in the result schema). This mode
+ * can be used for any input type, including POJOs.
+ *
+ * Reference input fields by position:
+ * Field references must refer to existing fields in the input type (except for
+ * renaming with alias (as)). In this mode, fields are simply renamed. Event-time attributes can
+ * replace the field on their position in the input data (if it is of correct type) or be
+ * appended at the end. Proctime attributes must be appended at the end. This mode can only be
+ * used if the input type has a defined field order (tuple, case class, Row) and no of fields
+ * references a field of the input type.
+ */
+ protected def isReferenceByPosition(ct: CompositeType[_], fields: Array[Expression]): Boolean = {
+ if (!ct.isInstanceOf[TupleTypeInfoBase[_]]) {
+ return false
+ }
+
+ val inputNames = ct.getFieldNames
+
+ // Use the by-position mode if no of the fields exists in the input.
+ // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
+ // by position but the user might assume reordering instead of renaming.
+ fields.forall {
+ case UnresolvedFieldReference(name) => !inputNames.contains(name)
+ case _ => true
+ }
+ }
+
+ /**
* Returns field names and field positions for a given [[TypeInformation]].
*
* @param inputType The TypeInformation extract the field names and positions from.
@@ -796,97 +828,83 @@ abstract class TableEnvironment(val config: TableConfig) {
* @tparam A The type of the TypeInformation.
* @return A tuple of two arrays holding the field names and corresponding field positions.
*/
- protected[flink] def getFieldInfo[A](
+ protected def getFieldInfo[A](
inputType: TypeInformation[A],
exprs: Array[Expression])
: (Array[String], Array[Int]) = {
TableEnvironment.validateType(inputType)
+ def referenceByName(name: String, ct: CompositeType[_]): Option[Int] = {
+ val inputIdx = ct.getFieldIndex(name)
+ if (inputIdx < 0) {
+ throw new TableException(s"$name is not a field of type $ct. " +
+ s"Expected: ${ct.getFieldNames.mkString(", ")}")
+ } else {
+ Some(inputIdx)
+ }
+ }
+
val indexedNames: Array[(Int, String)] = inputType match {
+
case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
throw new TableException(
"An input of GenericTypeInfo<Row> cannot be converted to Table. " +
"Please specify the type of the input with a RowTypeInfo.")
- case a: AtomicType[_] =>
- exprs.zipWithIndex flatMap {
- case (_: TimeAttribute, _) =>
- None
- case (UnresolvedFieldReference(name), idx) if idx > 0 =>
- // only accept the first field for an atomic type
- throw new TableException("Only the first field can reference an atomic type.")
- case (UnresolvedFieldReference(name), idx) =>
- // first field reference is mapped to atomic type
- Some((0, name))
- case _ => throw new TableException("Field reference expression requested.")
- }
- case t: TupleTypeInfo[A] =>
+
+ case t: TupleTypeInfoBase[A] if t.isInstanceOf[TupleTypeInfo[A]] ||
+ t.isInstanceOf[CaseClassTypeInfo[A]] || t.isInstanceOf[RowTypeInfo] =>
+
+ // determine schema definition mode (by position or by name)
+ val isRefByPos = isReferenceByPosition(t, exprs)
+
exprs.zipWithIndex flatMap {
- case (UnresolvedFieldReference(name), idx) =>
- Some((idx, name))
- case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
- val idx = t.getFieldIndex(origName)
- if (idx < 0) {
- throw new TableException(s"$origName is not a field of type $t")
+ case (UnresolvedFieldReference(name: String), idx) =>
+ if (isRefByPos) {
+ Some((idx, name))
+ } else {
+ referenceByName(name, t).map((_, name))
}
- Some((idx, name))
- case (_: TimeAttribute, _) =>
- None
- case _ => throw new TableException(
- "Field reference expression or alias on field expression expected.")
- }
- case c: CaseClassTypeInfo[A] =>
- exprs.zipWithIndex flatMap {
- case (UnresolvedFieldReference(name), idx) =>
- Some((idx, name))
- case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
- val idx = c.getFieldIndex(origName)
- if (idx < 0) {
- throw new TableException(s"$origName is not a field of type $c")
+ case (Alias(UnresolvedFieldReference(origName), name: String, _), _) =>
+ if (isRefByPos) {
+ throw new TableException(
+ s"Alias '$name' is not allowed if other fields are referenced by position.")
+ } else {
+ referenceByName(origName, t).map((_, name))
}
- Some((idx, name))
case (_: TimeAttribute, _) =>
None
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
+
case p: PojoTypeInfo[A] =>
exprs flatMap {
- case (UnresolvedFieldReference(name)) =>
- val idx = p.getFieldIndex(name)
- if (idx < 0) {
- throw new TableException(s"$name is not a field of type $p")
- }
- Some((idx, name))
- case Alias(UnresolvedFieldReference(origName), name, _) =>
- val idx = p.getFieldIndex(origName)
- if (idx < 0) {
- throw new TableException(s"$origName is not a field of type $p")
- }
- Some((idx, name))
+ case (UnresolvedFieldReference(name: String)) =>
+ referenceByName(name, p).map((_, name))
+ case Alias(UnresolvedFieldReference(origName), name: String, _) =>
+ referenceByName(origName, p).map((_, name))
case _: TimeAttribute =>
None
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
- case r: RowTypeInfo =>
- exprs.zipWithIndex flatMap {
- case (UnresolvedFieldReference(name), idx) =>
- Some((idx, name))
- case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
- val idx = r.getFieldIndex(origName)
- if (idx < 0) {
- throw new TableException(s"$origName is not a field of type $r")
- }
- Some((idx, name))
- case (_: TimeAttribute, _) =>
+
+ case _: TypeInformation[_] => // atomic or other custom type information
+ var referenced = false
+ exprs flatMap {
+ case _: TimeAttribute =>
None
+ case UnresolvedFieldReference(_) if referenced =>
+ // only accept the first field for an atomic type
+ throw new TableException("Only the first field can reference an atomic type.")
+ case UnresolvedFieldReference(name: String) =>
+ referenced = true
+ // first field reference is mapped to atomic type
+ Some((0, name))
case _ => throw new TableException(
- "Field reference expression or alias on field expression expected.")
+ "Field reference expression expected.")
}
-
- case tpe => throw new TableException(
- s"Source of type $tpe cannot be converted into Table.")
}
val (fieldIndexes, fieldNames) = indexedNames.unzip
@@ -967,17 +985,17 @@ abstract class TableEnvironment(val config: TableConfig) {
}
}
- // Atomic type requested
- case at: AtomicType[_] =>
+ // atomic type requested
+ case t: TypeInformation[_] =>
if (fieldTypes.size != 1) {
throw new TableException(s"Requested result type is an atomic type but " +
s"result[$fieldTypes] has more or less than a single field.")
}
val requestedTypeInfo = fieldTypes.head
validateFieldType(requestedTypeInfo)
- if (requestedTypeInfo != at) {
+ if (requestedTypeInfo != t) {
throw new TableException(s"Result field does not match requested type. " +
- s"Requested: $at; Actual: $requestedTypeInfo")
+ s"Requested: $t; Actual: $requestedTypeInfo")
}
case _ =>
@@ -1119,10 +1137,7 @@ object TableEnvironment {
val fieldNames: Array[String] = inputType match {
case t: CompositeType[_] => t.getFieldNames
- case a: AtomicType[_] => Array("f0")
- case tpe =>
- throw new TableException(s"Currently only CompositeType and AtomicType are supported. " +
- s"Type $tpe lacks explicit field naming")
+ case _: TypeInformation[_] => Array("f0")
}
if (fieldNames.contains("*")) {
@@ -1167,10 +1182,8 @@ object TableEnvironment {
validateType(inputType)
inputType match {
- case t: CompositeType[_] => 0.until(t.getArity).map(i => t.getTypeAt(i)).toArray
- case a: AtomicType[_] => Array(a.asInstanceOf[TypeInformation[_]])
- case tpe =>
- throw new TableException(s"Currently only CompositeType and AtomicType are supported.")
+ case ct: CompositeType[_] => 0.until(ct.getArity).map(i => ct.getTypeAt(i)).toArray
+ case t: TypeInformation[_] => Array(t.asInstanceOf[TypeInformation[_]])
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 40ea5b2..df7ef57 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -370,10 +370,10 @@ abstract class CodeGenerator(
case _ => // ok
}
- case at: AtomicType[_] if at != fieldExprs.head.resultType =>
+ case t: TypeInformation[_] if t != fieldExprs.head.resultType =>
throw new CodeGenException(
s"Incompatible types of expression and result type. Expression [${fieldExprs.head}] " +
- s"type is [${fieldExprs.head.resultType}], result type is [$at]")
+ s"type is [${fieldExprs.head.resultType}], result type is [$t]")
case _ => // ok
}
@@ -531,7 +531,7 @@ abstract class CodeGenerator(
GeneratedExpression(resultTerm, "false", resultCode, returnType)
- case a: AtomicType[_] =>
+ case t: TypeInformation[_] =>
val fieldExpr = boxedFieldExprs.head
val nullCheckCode = if (nullCheck) {
s"""
@@ -1079,8 +1079,7 @@ abstract class CodeGenerator(
val fieldType = inputType match {
case ct: CompositeType[_] => ct.getTypeAt(index)
- case at: AtomicType[_] => at
- case _ => throw new CodeGenException("Unsupported type for input field access.")
+ case t: TypeInformation[_] => t
}
val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
val defaultValue = primitiveDefaultValue(fieldType)
@@ -1110,6 +1109,7 @@ abstract class CodeGenerator(
index: Int)
: GeneratedExpression = {
inputType match {
+
case ct: CompositeType[_] =>
val accessor = fieldAccessorFor(ct, index)
val fieldType: TypeInformation[Any] = ct.getTypeAt(index)
@@ -1156,13 +1156,10 @@ abstract class CodeGenerator(
}
}
- case at: AtomicType[_] =>
- val fieldTypeTerm = boxedTypeTermForTypeInfo(at)
+ case t: TypeInformation[_] =>
+ val fieldTypeTerm = boxedTypeTermForTypeInfo(t)
val inputCode = s"($fieldTypeTerm) $inputTerm"
- generateInputFieldUnboxing(at, inputCode)
-
- case _ =>
- throw new CodeGenException("Unsupported type for input field access.")
+ generateInputFieldUnboxing(t, inputCode)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
index cab8ea9..df6fd29 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
@@ -23,7 +23,7 @@ import java.util.Collections
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.schema.{FunctionParameter, TableFunction}
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -52,19 +52,21 @@ class FlinkTableFunctionImpl[T](
val fieldTypes: Array[TypeInformation[_]] =
typeInfo match {
- case cType: CompositeType[T] =>
- if (fieldNames.length != cType.getArity) {
+
+ case ct: CompositeType[T] =>
+ if (fieldNames.length != ct.getArity) {
throw new TableException(
- s"Arity of type (" + cType.getFieldNames.deep + ") " +
+ s"Arity of type (" + ct.getFieldNames.deep + ") " +
"not equal to number of field names " + fieldNames.deep + ".")
}
- fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
- case aType: AtomicType[T] =>
+ fieldIndexes.map(ct.getTypeAt(_).asInstanceOf[TypeInformation[_]])
+
+ case t: TypeInformation[T] =>
if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
throw new TableException(
"Non-composite input type may have only a single field and its index must be 0.")
}
- Array(aType)
+ Array(t)
}
override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]]
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
index 22d6151..7e61fdf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.schema
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.schema.Statistic
import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.table.api.{TableException, Types}
import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -59,11 +59,12 @@ abstract class InlineTable[T](
val fieldTypes: Array[TypeInformation[_]] =
typeInfo match {
- case cType: CompositeType[_] =>
+
+ case ct: CompositeType[_] =>
// it is ok to leave out fields
- if (fieldIndexes.count(_ >= 0) > cType.getArity) {
+ if (fieldIndexes.count(_ >= 0) > ct.getArity) {
throw new TableException(
- s"Arity of type (" + cType.getFieldNames.deep + ") " +
+ s"Arity of type (" + ct.getFieldNames.deep + ") " +
"must not be greater than number of field names " + fieldNames.deep + ".")
}
fieldIndexes.map {
@@ -75,8 +76,9 @@ abstract class InlineTable[T](
Types.SQL_TIMESTAMP
case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER =>
Types.SQL_TIMESTAMP
- case i => cType.getTypeAt(i).asInstanceOf[TypeInformation[_]]}
- case aType: AtomicType[_] =>
+ case i => ct.getTypeAt(i).asInstanceOf[TypeInformation[_]]}
+
+ case t: TypeInformation[_] =>
var cnt = 0
val types = fieldIndexes.map {
case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER =>
@@ -89,7 +91,7 @@ abstract class InlineTable[T](
Types.SQL_TIMESTAMP
case _ =>
cnt += 1
- aType.asInstanceOf[TypeInformation[_]]
+ t.asInstanceOf[TypeInformation[_]]
}
// ensure that the atomic type is matched at most once.
if (cnt > 1) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
index d62c7b9..2bafed9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
@@ -135,7 +135,8 @@ object SortUtil {
val fieldComps = for ((k, o) <- sortFields.zip(sortDirections)) yield {
FlinkTypeFactory.toTypeInfo(inputType.getFieldList.get(k).getType) match {
- case a: AtomicType[AnyRef] => a.createComparator(o, execConfig)
+ case a: AtomicType[_] =>
+ a.createComparator(o, execConfig).asInstanceOf[TypeComparator[AnyRef]]
case x: TypeInformation[_] =>
throw new TableException(s"Unsupported field type $x to sort on.")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index 48ab3de..6895419 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.logical.LogicalValues
import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.table.api.{TableException, Types, ValidationException}
import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -95,7 +95,7 @@ object TableSourceUtil {
mappedFieldCnt += 1
}
// ensure that only one field is mapped to an atomic type
- if (tableSource.getReturnType.isInstanceOf[AtomicType[_]] && mappedFieldCnt > 1) {
+ if (!tableSource.getReturnType.isInstanceOf[CompositeType[_]] && mappedFieldCnt > 1) {
throw ValidationException(
s"More than one table field matched to atomic input type ${tableSource.getReturnType}.")
}
@@ -231,7 +231,7 @@ object TableSourceUtil {
}
// ensure that only one field is mapped to an atomic type
- if (inputType.isInstanceOf[AtomicType[_]] && mapping.count(_ >= 0) > 1) {
+ if (!inputType.isInstanceOf[CompositeType[_]] && mapping.count(_ >= 0) > 1) {
throw ValidationException(
s"More than one table field matched to atomic input type $inputType.")
}
@@ -466,9 +466,7 @@ object TableSourceUtil {
/** Look up a field by name in a type information */
def lookupField(fieldName: String, failMsg: String): (String, Int, TypeInformation[_]) = {
returnType match {
- case a: AtomicType[_] =>
- // no composite type, we return the full atomic type as field
- (fieldName, 0, a)
+
case c: CompositeType[_] =>
// get and check field index
val idx = c.getFieldIndex(fieldName)
@@ -477,7 +475,10 @@ object TableSourceUtil {
}
// return field name, index, and field type
(fieldName, idx, c.getTypeAt(idx))
- case _ => throw TableException("Unexpected type information.")
+
+ case t: TypeInformation[_] =>
+ // no composite type, we return the full atomic type as field
+ (fieldName, 0, t)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
index be7879a..4dc58b2 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.batch.table;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -36,6 +37,7 @@ import org.apache.flink.table.calcite.CalciteConfigBuilder;
import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase;
import org.apache.flink.table.runtime.utils.TableProgramsTestBase;
import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.types.Either;
import org.apache.flink.types.Row;
import org.apache.calcite.tools.RuleSets;
@@ -177,7 +179,7 @@ public class JavaTableEnvironmentITCase extends TableProgramsCollectionTestBase
}
@Test
- public void testAsFromTuple() throws Exception {
+ public void testAsFromTupleByPosition() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
@@ -198,6 +200,25 @@ public class JavaTableEnvironmentITCase extends TableProgramsCollectionTestBase
}
@Test
+ public void testAsFromTupleByName() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "f2");
+
+ DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "Hi\n" + "Hello\n" + "Hello world\n" +
+ "Hello world, how are you?\n" + "I am fine.\n" + "Luke Skywalker\n" +
+ "Comment#1\n" + "Comment#2\n" + "Comment#3\n" + "Comment#4\n" +
+ "Comment#5\n" + "Comment#6\n" + "Comment#7\n" +
+ "Comment#8\n" + "Comment#9\n" + "Comment#10\n" +
+ "Comment#11\n" + "Comment#12\n" + "Comment#13\n" +
+ "Comment#14\n" + "Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
public void testAsFromAndToTuple() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
@@ -273,6 +294,57 @@ public class JavaTableEnvironmentITCase extends TableProgramsCollectionTestBase
}
@Test
+ public void testFromNonAtomicAndNonComposite() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<Either<String, Integer>> data = new ArrayList<>();
+ data.add(new Either.Left<>("Hello"));
+ data.add(new Either.Right<>(42));
+ data.add(new Either.Left<>("World"));
+
+ Table table = tableEnv
+ .fromDataSet(
+ env.fromCollection(
+ data,
+ TypeInformation.of(new TypeHint<Either<String, Integer>>() { })
+ ),
+ "either")
+ .select("either");
+
+ DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+ List<Row> results = ds.collect();
+ String expected =
+ "Left(Hello)\n" +
+ "Left(World)\n" +
+ "Right(42)\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromPojoProjected() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<SmallPojo> data = new ArrayList<>();
+ data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new Integer[] {42}));
+ data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new Integer[] {}));
+ data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] {1, 2, 3}));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data), "name AS d")
+ .select("d");
+
+ DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+ List<Row> results = ds.collect();
+ String expected =
+ "Peter\n" +
+ "Anna\n" +
+ "Lucy\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
public void testAsFromPrivateFieldsPojo() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index ad84a03..1c097d3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -18,231 +18,556 @@
package org.apache.flink.table.api
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
import org.apache.flink.api.scala._
-import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
-import org.apache.flink.table.runtime.types.CRowTypeInfo
-import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
+import org.apache.flink.table.api.TableEnvironmentTest._
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{PROCTIME_INDICATOR => PROCTIME}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{ROWTIME_INDICATOR => ROWTIME}
+import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
+import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3}
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.scala.typeutils.UnitTypeInfo
import org.junit.Test
class TableEnvironmentTest extends TableTestBase {
- val tEnv = new MockTableEnvironment
-
- val tupleType = new TupleTypeInfo(
- INT_TYPE_INFO,
- STRING_TYPE_INFO,
- DOUBLE_TYPE_INFO)
-
- val rowType = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
-
- val cRowType = new CRowTypeInfo(rowType)
-
- val caseClassType: TypeInformation[CClass] = implicitly[TypeInformation[CClass]]
-
- val pojoType: TypeInformation[PojoClass] = TypeExtractor.createTypeInfo(classOf[PojoClass])
-
- val atomicType = INT_TYPE_INFO
-
- val genericRowType = new GenericTypeInfo[Row](classOf[Row])
+ // ----------------------------------------------------------------------------------------------
+ // schema definition by position
+ // ----------------------------------------------------------------------------------------------
@Test
- def testGetFieldInfoRow(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(rowType)
-
- fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testProjectByPosition(): Unit = {
+ val utils = Seq(streamTestUtil(), batchTestUtil())
+
+ utils.foreach { util =>
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b),
+ Seq("a" -> INT, "b" -> STRING))
+
+ util.verifySchema(
+ util.addTable[CClass]('a),
+ Seq("a" -> INT))
+
+ // row
+ util.verifySchema(
+ util.addTable('a, 'b, 'c)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable('a, 'b)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING))
+
+ util.verifySchema(
+ util.addTable('a)(TEST_ROW),
+ Seq("a" -> INT))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b),
+ Seq("a" -> INT, "b" -> STRING))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a),
+ Seq("a" -> INT))
+ }
}
@Test
- def testGetFieldInfoRowNames(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- rowType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2"),
- UnresolvedFieldReference("name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamProjectWithAddingTimeAttributesByPosition(): Unit = {
+ val util = streamTestUtil()
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c , 'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+ util.verifySchema(
+ util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime, 'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, "proctime" -> PROCTIME))
+
+ // row
+ util.verifySchema(
+ util.addTable('a, 'b, 'c, 'proctime.proctime)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable('a, 'b, 'c, 'rowtime.rowtime)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+ util.verifySchema(
+ util.addTable('a, 'b, 'c, 'rowtime.rowtime, 'proctime.proctime)(TEST_ROW),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, "proctime" -> PROCTIME))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 'rowtime.rowtime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 'rowtime.rowtime, 'proctime.proctime),
+ Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, "proctime" -> PROCTIME))
}
@Test
- def testGetFieldInfoTuple(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(tupleType)
-
- fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamAliasWithReplacingTimeAttributesByPosition(): Unit = {
+ val util = streamTestUtil()
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClassWithTime]('a, 'b.rowtime, 'c),
+ Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+ util.verifySchema(
+ util.addTable[CClassWithTime]('a, 'b.rowtime, 'c),
+ Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+ // row
+ util.verifySchema(
+ util.addTable('a, 'b.rowtime, 'c)(TEST_ROW_WITH_TIME),
+ Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+ util.verifySchema(
+ util.addTable('a, 'b.rowtime, 'c)(TEST_ROW_WITH_TIME),
+ Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, Long, String]]('a, 'b.rowtime, 'c),
+ Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, Long, String]]('a, 'b.rowtime, 'c),
+ Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
}
- @Test
- def testGetFieldInfoCClass(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(caseClassType)
+ // ----------------------------------------------------------------------------------------------
+ // schema definition by name
+ // ----------------------------------------------------------------------------------------------
- fieldInfo._1.zip(Array("cf1", "cf2", "cf3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ @Test
+ def testProjectByName(): Unit = {
+ val utils = Seq(streamTestUtil(), batchTestUtil())
+
+ utils.foreach { util =>
+
+ // atomic
+ util.verifySchema(
+ util.addTable[Int](),
+ Seq("f0" -> INT))
+
+ util.verifySchema(
+ util.addTable[Int]('myint),
+ Seq("myint" -> INT))
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass](),
+ Seq("cf1" -> INT, "cf2" -> STRING, "cf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf1, 'cf2),
+ Seq("cf1" -> INT, "cf2" -> STRING))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf1, 'cf3),
+ Seq("cf1" -> INT, "cf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf3, 'cf1),
+ Seq("cf3" -> DOUBLE, "cf1" -> INT))
+
+ // row
+ util.verifySchema(
+ util.addTable()(TEST_ROW),
+ Seq("rf1" -> INT, "rf2" -> STRING, "rf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable('rf1, 'rf2)(TEST_ROW),
+ Seq("rf1" -> INT, "rf2" -> STRING))
+
+ util.verifySchema(
+ util.addTable('rf1, 'rf3)(TEST_ROW),
+ Seq("rf1" -> INT, "rf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable('rf3, 'rf1)(TEST_ROW),
+ Seq("rf3" -> DOUBLE, "rf1" -> INT))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]](),
+ Seq("f0" -> INT, "f1" -> STRING, "f2" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f0, 'f1),
+ Seq("f0" -> INT, "f1" -> STRING))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f0, 'f2),
+ Seq("f0" -> INT, "f2" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f2, 'f0),
+ Seq("f2" -> DOUBLE, "f0" -> INT))
+
+ // pojo
+ util.verifySchema(
+ util.addTable[PojoClass](),
+ Seq("pf1" -> INT, "pf2" -> STRING, "pf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf1, 'pf2),
+ Seq("pf1" -> INT, "pf2" -> STRING))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf1, 'pf3),
+ Seq("pf1" -> INT, "pf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf3, 'pf1),
+ Seq("pf3" -> DOUBLE, "pf1" -> INT))
+
+ // generic
+ util.verifySchema(
+ util.addTable[Class[_]]('mygeneric),
+ Seq("mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+ util.verifySchema(
+ util.addTable[Class[_]](),
+ Seq("f0" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+ // any type info
+ util.verifySchema(
+ util.addTable[Unit](),
+ Seq("f0" -> new UnitTypeInfo()))
+
+ util.verifySchema(
+ util.addTable[Unit]('unit),
+ Seq("unit" -> new UnitTypeInfo()))
+ }
}
@Test
- def testGetFieldInfoPojo(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(pojoType)
-
- fieldInfo._1.zip(Array("pf1", "pf2", "pf3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamProjectWithAddingTimeAttributesByName(): Unit = {
+ val util = streamTestUtil()
+
+ // atomic
+ util.verifySchema(
+ util.addTable[Int]('proctime.proctime, 'myint),
+ Seq("proctime" -> PROCTIME, "myint" -> INT))
+
+ util.verifySchema(
+ util.addTable[Int]('rowtime.rowtime, 'myint),
+ Seq("rowtime" -> ROWTIME, "myint" -> INT))
+
+ util.verifySchema(
+ util.addTable[Int]('myint, 'proctime.proctime),
+ Seq("myint" -> INT, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[Int]('myint, 'rowtime.rowtime),
+ Seq("myint" -> INT, "rowtime" -> ROWTIME))
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass]('proctime.proctime, 'cf1, 'cf3),
+ Seq("proctime" -> PROCTIME, "cf1" -> INT, "cf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[CClass]('rowtime.rowtime, 'cf3, 'cf1),
+ Seq("rowtime" -> ROWTIME, "cf3" -> DOUBLE, "cf1" -> INT))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf1, 'proctime.proctime, 'cf3),
+ Seq("cf1" -> INT, "proctime" -> PROCTIME, "cf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf3, 'rowtime.rowtime, 'cf1),
+ Seq("cf3" -> DOUBLE, "rowtime" -> ROWTIME, "cf1" -> INT))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf1, 'cf3, 'proctime.proctime),
+ Seq("cf1" -> INT, "cf3" -> DOUBLE, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[CClass]('cf3, 'cf1, 'rowtime.rowtime),
+ Seq("cf3" -> DOUBLE, "cf1" -> INT, "rowtime" -> ROWTIME))
+
+ // row
+ util.verifySchema(
+ util.addTable('proctime.proctime, 'rf1, 'rf3)(TEST_ROW),
+ Seq("proctime" -> PROCTIME, "rf1" -> INT, "rf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable('rowtime.rowtime, 'rf3, 'rf1)(TEST_ROW),
+ Seq("rowtime" -> ROWTIME, "rf3" -> DOUBLE, "rf1" -> INT))
+
+ util.verifySchema(
+ util.addTable('rf3, 'proctime.proctime, 'rf1)(TEST_ROW),
+ Seq("rf3" -> DOUBLE, "proctime" -> PROCTIME, "rf1" -> INT))
+
+ util.verifySchema(
+ util.addTable('rf3, 'rowtime.rowtime, 'rf1)(TEST_ROW),
+ Seq("rf3" -> DOUBLE, "rowtime" -> ROWTIME, "rf1" -> INT))
+
+ util.verifySchema(
+ util.addTable('rf3, 'rf1, 'proctime.proctime)(TEST_ROW),
+ Seq("rf3" -> DOUBLE, "rf1" -> INT, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable('rf3, 'rf1, 'rowtime.rowtime)(TEST_ROW),
+ Seq("rf3" -> DOUBLE, "rf1" -> INT, "rowtime" -> ROWTIME))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('proctime.proctime, 'f0, 'f2),
+ Seq("proctime" -> PROCTIME, "f0" -> INT, "f2" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('rowtime.rowtime, 'f2, 'f0),
+ Seq("rowtime" -> ROWTIME, "f2" -> DOUBLE, "f0" -> INT))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f0, 'proctime.proctime, 'f2),
+ Seq("f0" -> INT, "proctime" -> PROCTIME, "f2" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f2, 'rowtime.rowtime, 'f0),
+ Seq("f2" -> DOUBLE, "rowtime" -> ROWTIME, "f0" -> INT))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f0, 'f2, 'proctime.proctime),
+ Seq("f0" -> INT, "f2" -> DOUBLE, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f2, 'f0, 'rowtime.rowtime),
+ Seq("f2" -> DOUBLE, "f0" -> INT, "rowtime" -> ROWTIME))
+
+ // pojo
+ util.verifySchema(
+ util.addTable[PojoClass]('proctime.proctime, 'pf1, 'pf3),
+ Seq("proctime" -> PROCTIME, "pf1" -> INT, "pf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('rowtime.rowtime, 'pf3, 'pf1),
+ Seq("rowtime" -> ROWTIME, "pf3" -> DOUBLE, "pf1" -> INT))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf1, 'proctime.proctime, 'pf3),
+ Seq("pf1" -> INT, "proctime" -> PROCTIME, "pf3" -> DOUBLE))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf3, 'rowtime.rowtime, 'pf1),
+ Seq("pf3" -> DOUBLE, "rowtime" -> ROWTIME, "pf1" -> INT))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf1, 'pf3, 'proctime.proctime),
+ Seq("pf1" -> INT, "pf3" -> DOUBLE, "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[PojoClass]('pf3, 'pf1, 'rowtime.rowtime),
+ Seq("pf3" -> DOUBLE, "pf1" -> INT, "rowtime" -> ROWTIME))
+
+ // generic
+ util.verifySchema(
+ util.addTable[Class[_]]('proctime.proctime, 'mygeneric),
+ Seq("proctime" -> PROCTIME, "mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+ util.verifySchema(
+ util.addTable[Class[_]]('rowtime.rowtime, 'mygeneric),
+ Seq("rowtime" -> ROWTIME, "mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+ util.verifySchema(
+ util.addTable[Class[_]]('mygeneric, 'proctime.proctime),
+ Seq("mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]]), "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[Class[_]]('mygeneric, 'rowtime.rowtime),
+ Seq("mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]]), "rowtime" -> ROWTIME))
+
+ // any type info
+ util.verifySchema(
+ util.addTable[Unit]('proctime.proctime, 'unit),
+ Seq("proctime" -> PROCTIME, "unit" -> new UnitTypeInfo()))
+
+ util.verifySchema(
+ util.addTable[Unit]('rowtime.rowtime, 'unit),
+ Seq("rowtime" -> ROWTIME, "unit" -> new UnitTypeInfo()))
+
+ util.verifySchema(
+ util.addTable[Unit]('unit, 'proctime.proctime),
+ Seq("unit" -> new UnitTypeInfo(), "proctime" -> PROCTIME))
+
+ util.verifySchema(
+ util.addTable[Unit]('unit, 'rowtime.rowtime),
+ Seq("unit" -> new UnitTypeInfo(), "rowtime" -> ROWTIME))
}
@Test
- def testGetFieldInfoAtomic(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(atomicType)
-
- fieldInfo._1.zip(Array("f0")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamProjectWithReplacingTimeAttributesByName(): Unit = {
+ val util = streamTestUtil()
+
+ // atomic
+ util.verifySchema(
+ util.addTable[Long]('new.rowtime),
+ Seq("new" -> ROWTIME))
+
+ util.verifySchema(
+ util.addTable[Int]('new.proctime),
+ Seq("new" -> PROCTIME))
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClassWithTime]('cf1, 'xxx.proctime, 'cf3),
+ Seq("cf1" -> INT, "xxx" -> PROCTIME, "cf3" -> STRING))
+
+ util.verifySchema(
+ util.addTable[CClassWithTime]('cf1, 'cf2.rowtime, 'cf3),
+ Seq("cf1" -> INT, "cf2" -> ROWTIME, "cf3" -> STRING))
+
+ // row
+ util.verifySchema(
+ util.addTable('rf1, 'xxx.proctime, 'rf3)(TEST_ROW_WITH_TIME),
+ Seq("rf1" -> INT, "xxx" -> PROCTIME, "rf3" -> STRING))
+
+ util.verifySchema(
+ util.addTable('rf1, 'rf2.rowtime, 'rf3)(TEST_ROW_WITH_TIME),
+ Seq("rf1" -> INT, "rf2" -> ROWTIME, "rf3" -> STRING))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, Long, String]]('f0, 'xxx.proctime, 'f2),
+ Seq("f0" -> INT, "xxx" -> PROCTIME, "f2" -> STRING))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, Long, String]]('f0, 'f1.rowtime, 'f2),
+ Seq("f0" -> INT, "f1" -> ROWTIME, "f2" -> STRING))
}
@Test
- def testGetFieldInfoTupleNames(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- tupleType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2"),
- UnresolvedFieldReference("name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testAliasByName(): Unit = {
+ val utils = Seq(streamTestUtil(), batchTestUtil())
+
+ utils.foreach { util =>
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClass]('cf1, 'cf3 as 'new, 'cf2),
+ Seq("cf1" -> INT, "new" -> DOUBLE, "cf2" -> STRING))
+
+ // row
+ util.verifySchema(
+ util.addTable('rf1, 'rf3 as 'new, 'rf2)(TEST_ROW),
+ Seq("rf1" -> INT, "new" -> DOUBLE, "rf2" -> STRING))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, String, Double]]('f0, 'f2 as 'new, 'f1),
+ Seq("f0" -> INT, "new" -> DOUBLE, "f1" -> STRING))
+ }
}
@Test
- def testGetFieldInfoCClassNames(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- caseClassType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2"),
- UnresolvedFieldReference("name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamAliasWithAddingTimeAttributesByName(): Unit = {
+ val util = streamTestUtil()
+
+ // atomic
+ util.verifySchema(
+ util.addTable[Int]('new.proctime),
+ Seq("new" -> PROCTIME))
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClassWithTime]('cf1, 'new.proctime, 'cf2),
+ Seq("cf1" -> INT, "new" -> PROCTIME, "cf2" -> LONG))
+
+ util.verifySchema(
+ util.addTable[CClassWithTime]('cf1, 'new.rowtime, 'cf2),
+ Seq("cf1" -> INT, "new" -> ROWTIME, "cf2" -> LONG))
+
+ // row
+ util.verifySchema(
+ util.addTable('rf1, 'new.proctime, 'rf2)(TEST_ROW_WITH_TIME),
+ Seq("rf1" -> INT, "new" -> PROCTIME, "rf2" -> LONG))
+
+ util.verifySchema(
+ util.addTable('rf1, 'new.rowtime, 'rf2)(TEST_ROW_WITH_TIME),
+ Seq("rf1" -> INT, "new" -> ROWTIME, "rf2" -> LONG))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, Long, String]]('f0, 'new.proctime, 'f1),
+ Seq("f0" -> INT, "new" -> PROCTIME, "f1" -> LONG))
+
+ util.verifySchema(
+ util.addTable[JTuple3[Int, Long, String]]('f0, 'new.rowtime, 'f1),
+ Seq("f0" -> INT, "new" -> ROWTIME, "f1" -> LONG))
}
@Test
- def testGetFieldInfoPojoNames2(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- pojoType,
- Array(
- UnresolvedFieldReference("pf3"),
- UnresolvedFieldReference("pf1"),
- UnresolvedFieldReference("pf2")
- ))
-
- fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+ def testStreamAliasWithReplacingTimeAttributesByName(): Unit = {
+ val util = streamTestUtil()
+
+ // case class
+ util.verifySchema(
+ util.addTable[CClassWithTime]('cf1, ('cf2 as 'new).rowtime, 'cf3),
+ Seq("cf1" -> INT, "new" -> ROWTIME, "cf3" -> STRING))
+
+ // row
+ util.verifySchema(
+ util.addTable('rf1, ('rf2 as 'new).rowtime, 'rf3)(TEST_ROW_WITH_TIME),
+ Seq("rf1" -> INT, "new" -> ROWTIME, "rf3" -> STRING))
+
+ // tuple
+ util.verifySchema(
+ util.addTable[JTuple3[Int, Long, String]]('f0, ('f1 as 'new).rowtime, 'f2),
+ Seq("f0" -> INT, "new" -> ROWTIME, "f2" -> STRING))
}
+}
- @Test
- def testGetFieldInfoAtomicName1(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- atomicType,
- Array(UnresolvedFieldReference("name")))
+object TableEnvironmentTest {
- fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
- }
+ case class CClass(cf1: Int, cf2: String, cf3: Double)
- @Test
- def testGetFieldInfoTupleAlias1(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- tupleType,
- Array(
- Alias(UnresolvedFieldReference("f0"), "name1"),
- Alias(UnresolvedFieldReference("f1"), "name2"),
- Alias(UnresolvedFieldReference("f2"), "name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
- }
+ case class CClassWithTime(cf1: Int, cf2: Long, cf3: String)
- @Test
- def testGetFieldInfoTupleAlias2(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- tupleType,
- Array(
- Alias(UnresolvedFieldReference("f2"), "name1"),
- Alias(UnresolvedFieldReference("f0"), "name2"),
- Alias(UnresolvedFieldReference("f1"), "name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+ class PojoClass(var pf2: String, var pf1: Int, var pf3: Double) {
+ def this() = this("", 0, 0.0)
}
- @Test
- def testGetFieldInfoCClassAlias1(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- caseClassType,
- Array(
- Alias(UnresolvedFieldReference("cf1"), "name1"),
- Alias(UnresolvedFieldReference("cf2"), "name2"),
- Alias(UnresolvedFieldReference("cf3"), "name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ class PojoClassWithTime(var pf2: String, var pf1: Int, var pf3: Long) {
+ def this() = this("", 0, 0L)
}
- @Test
- def testGetFieldInfoCClassAlias2(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- caseClassType,
- Array(
- Alias(UnresolvedFieldReference("cf3"), "name1"),
- Alias(UnresolvedFieldReference("cf1"), "name2"),
- Alias(UnresolvedFieldReference("cf2"), "name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
- }
+ val TEST_ROW: TypeInformation[Row] = ROW(
+ Array("rf1", "rf2", "rf3"),
+ Array[TypeInformation[_]](INT, STRING, DOUBLE))
- @Test
- def testGetFieldInfoPojoAlias1(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- pojoType,
- Array(
- Alias(UnresolvedFieldReference("pf1"), "name1"),
- Alias(UnresolvedFieldReference("pf2"), "name2"),
- Alias(UnresolvedFieldReference("pf3"), "name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
- }
+ val TEST_ROW_WITH_TIME: TypeInformation[Row] = ROW(
+ Array("rf1", "rf2", "rf3"),
+ Array[TypeInformation[_]](INT, LONG, STRING))
- @Test
- def testGetFieldInfoPojoAlias2(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- pojoType,
- Array(
- Alias(UnresolvedFieldReference("pf3"), "name1"),
- Alias(UnresolvedFieldReference("pf1"), "name2"),
- Alias(UnresolvedFieldReference("pf2"), "name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
- }
}
-case class CClass(cf1: Int, cf2: String, cf3: Double)
-class PojoClass(var pf1: Int, var pf2: String, var pf3: Double) {
- def this() = this(0, "", 0.0)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
index 1fd58cc..bfa7bfa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
@@ -30,77 +30,102 @@ import org.junit.Test
class StreamTableEnvironmentValidationTest extends TableTestBase {
+ // ----------------------------------------------------------------------------------------------
+ // schema definition by position
+ // ----------------------------------------------------------------------------------------------
+
+ @Test(expected = classOf[TableException])
+ def testInvalidRowtimeAliasByPosition(): Unit = {
+ val util = streamTestUtil()
+ // don't allow aliasing by position
+ util.addTable[(Long, Int, String, Int, Long)](('a as 'b).rowtime, 'b, 'c, 'd, 'e)
+ }
+
@Test(expected = classOf[TableException])
- def testInvalidTimeAttributes(): Unit = {
+ def testInvalidRowtimeAttributesByPosition(): Unit = {
val util = streamTestUtil()
// table definition makes no sense
util.addTable[(Long, Int, String, Int, Long)]('a.rowtime.rowtime, 'b, 'c, 'd, 'e)
}
@Test(expected = classOf[TableException])
- def testInvalidProctimeAttribute(): Unit = {
+ def testInvalidProctimeAttributesByPosition(): Unit = {
+ val util = streamTestUtil()
+ // table definition makes no sense
+ util.addTable[(Long, Int, String, Int, Long)]('a.proctime.proctime, 'b, 'c, 'd, 'e)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidTimeAttributesByPosition(): Unit = {
+ val util = streamTestUtil()
+ // table definition makes no sense
+ util.addTable[(Long, Int, String, Int, Long)]('a.rowtime.rowtime, 'b, 'c, 'd, 'e)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidProctimeAttributeByPosition(): Unit = {
val util = streamTestUtil()
// cannot replace an attribute with proctime
util.addTable[(Long, Int, String, Int, Long)]('a, 'b.proctime, 'c, 'd, 'e)
}
@Test(expected = classOf[TableException])
- def testRowtimeAttributeReplaceFieldOfInvalidType(): Unit = {
+ def testRowtimeAttributeReplaceFieldOfInvalidTypeByPosition(): Unit = {
val util = streamTestUtil()
// cannot replace a non-time attribute with rowtime
util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c.rowtime, 'd, 'e)
}
@Test(expected = classOf[TableException])
- def testRowtimeAndInvalidProctimeAttribute(): Unit = {
+ def testRowtimeAndInvalidProctimeAttributeByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'pt.proctime)
}
@Test(expected = classOf[TableException])
- def testOnlyOneRowtimeAttribute1(): Unit = {
+ def testOnlyOneRowtimeAttribute1ByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Long, Int, String, Int, Long)]('a.rowtime, 'b, 'c, 'd, 'e, 'rt.rowtime)
}
@Test(expected = classOf[TableException])
- def testOnlyOneProctimeAttribute1(): Unit = {
+ def testOnlyOneProctimeAttribute1ByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt1.proctime, 'pt2.proctime)
}
@Test(expected = classOf[TableException])
- def testRowtimeAttributeUsedName(): Unit = {
+ def testRowtimeAttributeUsedNameByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'a.rowtime)
}
@Test(expected = classOf[TableException])
- def testProctimeAttributeUsedName(): Unit = {
+ def testProctimeAttributeUsedNameByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'b.proctime)
}
@Test(expected = classOf[TableException])
- def testAsWithToManyFields(): Unit = {
+ def testAsWithToManyFieldsByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Int, Long, String)]('a, 'b, 'c, 'd)
}
@Test(expected = classOf[TableException])
- def testAsWithAmbiguousFields(): Unit = {
+ def testAsWithAmbiguousFieldsByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Int, Long, String)]('a, 'b, 'b)
}
@Test(expected = classOf[TableException])
- def testOnlyFieldRefInAs(): Unit = {
+ def testOnlyFieldRefInAsByPosition(): Unit = {
val util = streamTestUtil()
util.addTable[(Int, Long, String)]('a, 'b as 'c, 'd)
}
@Test(expected = classOf[TableException])
- def testInvalidTimeCharacteristic(): Unit = {
+ def testInvalidTimeCharacteristicByPosition(): Unit = {
val data = List((1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -109,4 +134,57 @@ class StreamTableEnvironmentValidationTest extends TableTestBase {
.assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
}
+
+ // ----------------------------------------------------------------------------------------------
+ // schema definition by name
+ // ----------------------------------------------------------------------------------------------
+
+ @Test(expected = classOf[TableException])
+ def testInvalidAliasByName(): Unit = {
+ val util = streamTestUtil()
+ // we reference by name, but the field does not exist
+ util.addTable[(Long, Int, String, Int, Long)]('x as 'r)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidFieldByName(): Unit = {
+ val util = streamTestUtil()
+ // we reference by name, but the field does not exist
+ util.addTable[(Long, Int, String, Int, Long)]('x as 'r)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidField2ByName(): Unit = {
+ val util = streamTestUtil()
+ // we mix reference by position and by name
+ util.addTable[(Long, Int, String, Int, Long)]('x, '_1)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidAliasWithProctimeAttribute(): Unit = {
+ val util = streamTestUtil()
+ // alias in proctime not allowed
+ util.addTable[(Int, Long, String)]('_1, ('_2 as 'new).proctime, '_3)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidReplacingProctimeAttribute(): Unit = {
+ val util = streamTestUtil()
+ // proctime must not replace an existing field
+ util.addTable[(Int, Long, String)]('_1, '_2.proctime, '_3)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidAliasWithRowtimeAttribute(): Unit = {
+ val util = streamTestUtil()
+ // aliased field does not exist
+ util.addTable[(Int, Long, String)]('_1, ('newnew as 'new).rowtime, '_3)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidAliasWithRowtimeAttribute2(): Unit = {
+ val util = streamTestUtil()
+ // aliased field has wrong type
+ util.addTable[(Int, Long, String)]('_1, ('_3 as 'new).rowtime, '_2)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
index 1e7ad61..156dd8a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
@@ -23,15 +23,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironmentTest.{CClass, PojoClass}
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{CClass, PojoClass, TableEnvironment, TableException}
-import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
+import org.apache.flink.table.api.{TableEnvironment, TableException}
import org.apache.flink.table.runtime.types.CRowTypeInfo
+import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.types.Row
import org.junit.Assert.assertTrue
import org.junit._
-class TableEnvironmentValidationTest {
+class TableEnvironmentValidationTest extends TableTestBase {
private val env = ExecutionEnvironment.getExecutionEnvironment
private val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -53,75 +54,60 @@ class TableEnvironmentValidationTest {
val genericRowType = new GenericTypeInfo[Row](classOf[Row])
+ @Test(expected = classOf[TableException])
+ def testInvalidAliasInRefByPosMode(): Unit = {
+ val util = batchTestUtil()
+ // all references must happen position-based
+ util.addTable('a, 'b, 'f2 as 'c)(tupleType)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidAliasOnAtomicType(): Unit = {
+ val util = batchTestUtil()
+ // alias not allowed
+ util.addTable('g as 'c)(atomicType)
+ }
@Test(expected = classOf[TableException])
def testGetFieldInfoPojoNames1(): Unit = {
- tEnv.getFieldInfo(
- pojoType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2"),
- UnresolvedFieldReference("name3")
- ))
+ val util = batchTestUtil()
+ // duplicate name
+ util.addTable('name1, 'name1, 'name3)(pojoType)
}
@Test(expected = classOf[TableException])
def testGetFieldInfoAtomicName2(): Unit = {
- tEnv.getFieldInfo(
- atomicType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2")
- ))
+ val util = batchTestUtil()
+ // must be only one name
+ util.addTable('name1, 'name2)(atomicType)
}
@Test(expected = classOf[TableException])
def testGetFieldInfoTupleAlias3(): Unit = {
- tEnv.getFieldInfo(
- tupleType,
- Array(
- Alias(UnresolvedFieldReference("xxx"), "name1"),
- Alias(UnresolvedFieldReference("yyy"), "name2"),
- Alias(UnresolvedFieldReference("zzz"), "name3")
- ))
+ val util = batchTestUtil()
+ // fields do not exist
+ util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as 'name3)(tupleType)
}
@Test(expected = classOf[TableException])
def testGetFieldInfoCClassAlias3(): Unit = {
- tEnv.getFieldInfo(
- caseClassType,
- Array(
- Alias(UnresolvedFieldReference("xxx"), "name1"),
- Alias(UnresolvedFieldReference("yyy"), "name2"),
- Alias(UnresolvedFieldReference("zzz"), "name3")
- ))
+ val util = batchTestUtil()
+ // fields do not exist
+ util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as 'name3)(caseClassType)
}
@Test(expected = classOf[TableException])
def testGetFieldInfoPojoAlias3(): Unit = {
- tEnv.getFieldInfo(
- pojoType,
- Array(
- Alias(UnresolvedFieldReference("xxx"), "name1"),
- Alias(UnresolvedFieldReference("yyy"), "name2"),
- Alias(UnresolvedFieldReference("zzz"), "name3")
- ))
- }
-
- @Test(expected = classOf[TableException])
- def testGetFieldInfoAtomicAlias(): Unit = {
- tEnv.getFieldInfo(
- atomicType,
- Array(
- Alias(UnresolvedFieldReference("name1"), "name2")
- ))
+ val util = batchTestUtil()
+ // fields do not exist
+ util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as 'name3)(pojoType)
}
@Test(expected = classOf[TableException])
def testGetFieldInfoGenericRowAlias(): Unit = {
- tEnv.getFieldInfo(
- genericRowType,
- Array(UnresolvedFieldReference("first")))
+ val util = batchTestUtil()
+ // unsupported generic row type
+ util.addTable('first)(genericRowType)
}
@Test(expected = classOf[TableException])
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
index 725c580..5295e7c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
@@ -63,14 +63,14 @@ class TableEnvironmentITCase(
}
@Test
- def testRegisterWithFields(): Unit = {
+ def testRegisterWithFieldsByPosition(): Unit = {
val tableName = "MyTable"
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
val ds = CollectionDataSets.get3TupleDataSet(env)
- tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
+ tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c) // new alias
val t = tEnv.scan(tableName).select('a, 'b)
val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
@@ -81,6 +81,24 @@ class TableEnvironmentITCase(
}
@Test
+ def testRegisterWithFieldsByName(): Unit = {
+
+ val tableName = "MyTable"
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ tEnv.registerDataSet(tableName, ds, '_3, '_1, '_2) // new order
+ val t = tEnv.scan(tableName).select('_1, '_2)
+
+ val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
+ "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+ "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+ val results = t.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
def testTableRegister(): Unit = {
val tableName = "MyTable"
http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 804fad8..30b67e7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{Table, TableEnvironment}
+import org.apache.flink.table.api.{Table, TableEnvironment, TableSchema}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.junit.Assert.assertEquals
@@ -82,6 +82,12 @@ abstract class TableTestUtil {
def verifyTable(resultTable: Table, expected: String): Unit
+ def verifySchema(resultTable: Table, fields: Seq[(String, TypeInformation[_])]): Unit = {
+ val actual = resultTable.getSchema
+ val expected = new TableSchema(fields.map(_._1).toArray, fields.map(_._2).toArray)
+ assertEquals(expected, actual)
+ }
+
// the print methods are for debugging purposes only
def printTable(resultTable: Table): Unit