You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:58 UTC
[29/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala
deleted file mode 100644
index b516745..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import java.util.Objects
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.base.{IntComparator, IntSerializer, LongComparator, LongSerializer}
-import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
-import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo.instantiateComparator
-import org.apache.flink.util.Preconditions._
-
-/**
- * TypeInformation for SQL INTERVAL types.
- */
-@SerialVersionUID(-1816179424364825258L)
-class TimeIntervalTypeInfo[T](
- val clazz: Class[T],
- val serializer: TypeSerializer[T],
- val comparatorClass: Class[_ <: TypeComparator[T]])
- extends TypeInformation[T]
- with AtomicType[T] {
-
- checkNotNull(clazz)
- checkNotNull(serializer)
- checkNotNull(comparatorClass)
-
- override def isBasicType: Boolean = false
-
- override def isTupleType: Boolean = false
-
- override def getArity: Int = 1
-
- override def getTotalFields: Int = 1
-
- override def getTypeClass: Class[T] = clazz
-
- override def isKeyType: Boolean = true
-
- override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = serializer
-
- override def createComparator(
- sortOrderAscending: Boolean,
- executionConfig: ExecutionConfig)
- : TypeComparator[T] = instantiateComparator(comparatorClass, sortOrderAscending)
-
- // ----------------------------------------------------------------------------------------------
-
- override def hashCode: Int = Objects.hash(clazz, serializer, comparatorClass)
-
- def canEqual(obj: Any): Boolean = obj.isInstanceOf[TimeIntervalTypeInfo[_]]
-
- override def equals(obj: Any): Boolean = {
- obj match {
- case other: TimeIntervalTypeInfo[_] =>
- other.canEqual(this) &&
- (this.clazz eq other.clazz) &&
- serializer == other.serializer &&
- (this.comparatorClass eq other.comparatorClass)
- case _ =>
- false
- }
- }
-
- override def toString: String = s"TimeIntervalTypeInfo(${clazz.getSimpleName})"
-}
-
-object TimeIntervalTypeInfo {
-
- val INTERVAL_MONTHS = new TimeIntervalTypeInfo(
- classOf[java.lang.Integer],
- IntSerializer.INSTANCE,
- classOf[IntComparator])
-
- val INTERVAL_MILLIS = new TimeIntervalTypeInfo(
- classOf[java.lang.Long],
- LongSerializer.INSTANCE,
- classOf[LongComparator])
-
- // ----------------------------------------------------------------------------------------------
-
- private def instantiateComparator[X](
- comparatorClass: Class[_ <: TypeComparator[X]],
- ascendingOrder: java.lang.Boolean)
- : TypeComparator[X] = {
- try {
- val constructor = comparatorClass.getConstructor(java.lang.Boolean.TYPE)
- constructor.newInstance(ascendingOrder)
- } catch {
- case e: Exception =>
- throw new RuntimeException(
- s"Could not initialize comparator ${comparatorClass.getName}", e)
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
deleted file mode 100644
index e30e273..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO}
-import org.apache.flink.api.common.typeinfo._
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.table.validate._
-
-object TypeCheckUtils {
-
- /**
- * Checks if type information is an advanced type that can be converted to a
- * SQL type but NOT vice versa.
- */
- def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
- case _: BasicTypeInfo[_] => false
- case _: SqlTimeTypeInfo[_] => false
- case _: TimeIntervalTypeInfo[_] => false
- case _ => true
- }
-
- /**
- * Checks if type information is a simple type that can be converted to a
- * SQL type and vice versa.
- */
- def isSimple(dataType: TypeInformation[_]): Boolean = !isAdvanced(dataType)
-
- def isNumeric(dataType: TypeInformation[_]): Boolean = dataType match {
- case _: NumericTypeInfo[_] => true
- case BIG_DEC_TYPE_INFO => true
- case _ => false
- }
-
- def isTemporal(dataType: TypeInformation[_]): Boolean =
- isTimePoint(dataType) || isTimeInterval(dataType)
-
- def isTimePoint(dataType: TypeInformation[_]): Boolean =
- dataType.isInstanceOf[SqlTimeTypeInfo[_]]
-
- def isTimeInterval(dataType: TypeInformation[_]): Boolean =
- dataType.isInstanceOf[TimeIntervalTypeInfo[_]]
-
- def isString(dataType: TypeInformation[_]): Boolean = dataType == STRING_TYPE_INFO
-
- def isBoolean(dataType: TypeInformation[_]): Boolean = dataType == BOOLEAN_TYPE_INFO
-
- def isDecimal(dataType: TypeInformation[_]): Boolean = dataType == BIG_DEC_TYPE_INFO
-
- def isInteger(dataType: TypeInformation[_]): Boolean = dataType == INT_TYPE_INFO
-
- def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
- case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true
- case _ => false
- }
-
- def isComparable(dataType: TypeInformation[_]): Boolean =
- classOf[Comparable[_]].isAssignableFrom(dataType.getTypeClass) && !isArray(dataType)
-
- def assertNumericExpr(
- dataType: TypeInformation[_],
- caller: String)
- : ValidationResult = dataType match {
- case _: NumericTypeInfo[_] =>
- ValidationSuccess
- case BIG_DEC_TYPE_INFO =>
- ValidationSuccess
- case _ =>
- ValidationFailure(s"$caller requires numeric types, get $dataType here")
- }
-
- def assertOrderableExpr(dataType: TypeInformation[_], caller: String): ValidationResult = {
- if (dataType.isSortKeyType) {
- ValidationSuccess
- } else {
- ValidationFailure(s"$caller requires orderable types, get $dataType here")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
deleted file mode 100644
index 23154a5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, NumericTypeInfo, TypeInformation}
-
-/**
- * Utilities for type conversions.
- */
-object TypeCoercion {
-
- val numericWideningPrecedence: IndexedSeq[TypeInformation[_]] =
- IndexedSeq(
- BYTE_TYPE_INFO,
- SHORT_TYPE_INFO,
- INT_TYPE_INFO,
- LONG_TYPE_INFO,
- FLOAT_TYPE_INFO,
- DOUBLE_TYPE_INFO)
-
- def widerTypeOf(tp1: TypeInformation[_], tp2: TypeInformation[_]): Option[TypeInformation[_]] = {
- (tp1, tp2) match {
- case (ti1, ti2) if ti1 == ti2 => Some(ti1)
-
- case (_, STRING_TYPE_INFO) => Some(STRING_TYPE_INFO)
- case (STRING_TYPE_INFO, _) => Some(STRING_TYPE_INFO)
-
- case (_, BIG_DEC_TYPE_INFO) => Some(BIG_DEC_TYPE_INFO)
- case (BIG_DEC_TYPE_INFO, _) => Some(BIG_DEC_TYPE_INFO)
-
- case (stti: SqlTimeTypeInfo[_], _: TimeIntervalTypeInfo[_]) => Some(stti)
- case (_: TimeIntervalTypeInfo[_], stti: SqlTimeTypeInfo[_]) => Some(stti)
-
- case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) =>
- val higherIndex = numericWideningPrecedence.lastIndexWhere(t => t == tp1 || t == tp2)
- Some(numericWideningPrecedence(higherIndex))
-
- case _ => None
- }
- }
-
- /**
- * Test if we can do cast safely without lose of information.
- */
- def canSafelyCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean = (from, to) match {
- case (_, STRING_TYPE_INFO) => true
-
- case (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) => true
-
- case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) =>
- if (numericWideningPrecedence.indexOf(from) < numericWideningPrecedence.indexOf(to)) {
- true
- } else {
- false
- }
-
- case _ => false
- }
-
- /**
- * All the supported cast types in flink-table.
- * Note: This may lose information during the cast.
- */
- def canCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean = (from, to) match {
- case (fromTp, toTp) if fromTp == toTp => true
-
- case (_, STRING_TYPE_INFO) => true
-
- case (_, CHAR_TYPE_INFO) => false // Character type not supported.
-
- case (STRING_TYPE_INFO, _: NumericTypeInfo[_]) => true
- case (STRING_TYPE_INFO, BOOLEAN_TYPE_INFO) => true
- case (STRING_TYPE_INFO, BIG_DEC_TYPE_INFO) => true
- case (STRING_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
- case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
- case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
-
- case (BOOLEAN_TYPE_INFO, _: NumericTypeInfo[_]) => true
- case (BOOLEAN_TYPE_INFO, BIG_DEC_TYPE_INFO) => true
- case (_: NumericTypeInfo[_], BOOLEAN_TYPE_INFO) => true
- case (BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO) => true
-
- case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => true
- case (BIG_DEC_TYPE_INFO, _: NumericTypeInfo[_]) => true
- case (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) => true
- case (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
- case (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
- case (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
- case (INT_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MONTHS) => true
- case (LONG_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
-
- case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIME) => false
- case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.DATE) => false
- case (_: SqlTimeTypeInfo[_], _: SqlTimeTypeInfo[_]) => true
- case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) => true
- case (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) => true
- case (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) => true
-
- case (TimeIntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) => true
- case (TimeIntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) => true
-
- case _ => false
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
deleted file mode 100644
index a81577c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.core.JoinRelType._
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
-import org.apache.flink.types.Row
-
-import scala.collection.JavaConversions._
-
-object TypeConverter {
-
- val DEFAULT_ROW_TYPE = new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
-
- /**
- * Determines the return type of Flink operators based on the logical fields, the expected
- * physical type and configuration parameters.
- *
- * For example:
- * - No physical type expected, only 3 non-null fields and efficient type usage enabled
- * -> return Tuple3
- * - No physical type expected, efficient type usage enabled, but 3 nullable fields
- * -> return Row because Tuple does not support null values
- * - Physical type expected
- * -> check if physical type is compatible and return it
- *
- * @param logicalRowType logical row information
- * @param expectedPhysicalType expected physical type
- * @param nullable fields can be nullable
- * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types)
- * @return suitable return type
- */
- def determineReturnType(
- logicalRowType: RelDataType,
- expectedPhysicalType: Option[TypeInformation[Any]],
- nullable: Boolean,
- useEfficientTypes: Boolean)
- : TypeInformation[Any] = {
- // convert to type information
- val logicalFieldTypes = logicalRowType.getFieldList map { relDataType =>
- FlinkTypeFactory.toTypeInfo(relDataType.getType)
- }
- // field names
- val logicalFieldNames = logicalRowType.getFieldNames.toList
-
- val returnType = expectedPhysicalType match {
- // a certain physical type is expected (but not Row)
- // check if expected physical type is compatible with logical field type
- case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- if (typeInfo.getArity != logicalFieldTypes.length) {
- throw new TableException("Arity of result does not match expected type.")
- }
- typeInfo match {
-
- // POJO type expected
- case pt: PojoTypeInfo[_] =>
- logicalFieldNames.zip(logicalFieldTypes) foreach {
- case (fName, fType) =>
- val pojoIdx = pt.getFieldIndex(fName)
- if (pojoIdx < 0) {
- throw new TableException(s"POJO does not define field name: $fName")
- }
- val expectedTypeInfo = pt.getTypeAt(pojoIdx)
- if (fType != expectedTypeInfo) {
- throw new TableException(s"Result field does not match expected type. " +
- s"Expected: $expectedTypeInfo; Actual: $fType")
- }
- }
-
- // Tuple/Case class type expected
- case ct: CompositeType[_] =>
- logicalFieldTypes.zipWithIndex foreach {
- case (fieldTypeInfo, i) =>
- val expectedTypeInfo = ct.getTypeAt(i)
- if (fieldTypeInfo != expectedTypeInfo) {
- throw new TableException(s"Result field does not match expected type. " +
- s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo")
- }
- }
-
- // Atomic type expected
- case at: AtomicType[_] =>
- val fieldTypeInfo = logicalFieldTypes.head
- if (fieldTypeInfo != at) {
- throw new TableException(s"Result field does not match expected type. " +
- s"Expected: $at; Actual: $fieldTypeInfo")
- }
-
- case _ =>
- throw new TableException("Unsupported result type.")
- }
- typeInfo
-
- // Row is expected, create the arity for it
- case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
- new RowTypeInfo(logicalFieldTypes: _*)
-
- // no physical type
- // determine type based on logical fields and configuration parameters
- case None =>
- // no need for efficient types -> use Row
- // we cannot use efficient types if row arity > tuple arity or nullable
- if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) {
- new RowTypeInfo(logicalFieldTypes: _*)
- }
- // use efficient type tuple or atomic type
- else {
- if (logicalFieldTypes.length == 1) {
- logicalFieldTypes.head
- }
- else {
- new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*)
- }
- }
- }
- returnType.asInstanceOf[TypeInformation[Any]]
- }
-
- def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match {
- case INNER => JoinType.INNER
- case LEFT => JoinType.LEFT_OUTER
- case RIGHT => JoinType.RIGHT_OUTER
- case FULL => JoinType.FULL_OUTER
- }
-
- def flinkJoinTypeToRelType(joinType: JoinType) = joinType match {
- case JoinType.INNER => JoinRelType.INNER
- case JoinType.LEFT_OUTER => JoinRelType.LEFT
- case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
- case JoinType.FULL_OUTER => JoinRelType.FULL
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
deleted file mode 100644
index 8e409cc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.validate
-
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable}
-import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
-import org.apache.flink.api.table.ValidationException
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction}
-import org.apache.flink.api.table.functions.utils.{TableSqlFunction, UserDefinedFunctionUtils}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.util.{Failure, Success, Try}
-
-/**
- * A catalog for looking up (user-defined) functions, used during validation phases
- * of both Table API and SQL API.
- */
-class FunctionCatalog {
-
- private val functionBuilders = mutable.HashMap.empty[String, Class[_]]
- private val sqlFunctions = mutable.ListBuffer[SqlFunction]()
-
- def registerFunction(name: String, builder: Class[_]): Unit =
- functionBuilders.put(name.toLowerCase, builder)
-
- def registerSqlFunction(sqlFunction: SqlFunction): Unit = {
- sqlFunctions --= sqlFunctions.filter(_.getName == sqlFunction.getName)
- sqlFunctions += sqlFunction
- }
-
- /**
- * Register multiple SQL functions at the same time. The functions have the same name.
- */
- def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
- if (functions.nonEmpty) {
- val name = functions.head.getName
- // check that all functions have the same name
- if (functions.forall(_.getName == name)) {
- sqlFunctions --= sqlFunctions.filter(_.getName == name)
- sqlFunctions ++= functions
- } else {
- throw ValidationException("The SQL functions to be registered have different names.")
- }
- }
- }
-
- def getSqlOperatorTable: SqlOperatorTable =
- ChainedSqlOperatorTable.of(
- new BasicOperatorTable(),
- new ListSqlOperatorTable(sqlFunctions)
- )
-
- /**
- * Lookup and create an expression if we find a match.
- */
- def lookupFunction(name: String, children: Seq[Expression]): Expression = {
- val funcClass = functionBuilders
- .getOrElse(name.toLowerCase, throw ValidationException(s"Undefined function: $name"))
-
- // Instantiate a function using the provided `children`
- funcClass match {
-
- // user-defined scalar function call
- case sf if classOf[ScalarFunction].isAssignableFrom(sf) =>
- Try(UserDefinedFunctionUtils.instantiate(sf.asInstanceOf[Class[ScalarFunction]])) match {
- case Success(scalarFunction) => ScalarFunctionCall(scalarFunction, children)
- case Failure(e) => throw ValidationException(e.getMessage)
- }
-
- // user-defined table function call
- case tf if classOf[TableFunction[_]].isAssignableFrom(tf) =>
- val tableSqlFunction = sqlFunctions
- .find(f => f.getName.equalsIgnoreCase(name) && f.isInstanceOf[TableSqlFunction])
- .getOrElse(throw ValidationException(s"Undefined table function: $name"))
- .asInstanceOf[TableSqlFunction]
- val typeInfo = tableSqlFunction.getRowTypeInfo
- val function = tableSqlFunction.getTableFunction
- TableFunctionCall(name, function, children, typeInfo)
-
- // general expression call
- case expression if classOf[Expression].isAssignableFrom(expression) =>
- // try to find a constructor accepts `Seq[Expression]`
- Try(funcClass.getDeclaredConstructor(classOf[Seq[_]])) match {
- case Success(seqCtor) =>
- Try(seqCtor.newInstance(children).asInstanceOf[Expression]) match {
- case Success(expr) => expr
- case Failure(e) => throw new ValidationException(e.getMessage)
- }
- case Failure(e) =>
- val childrenClass = Seq.fill(children.length)(classOf[Expression])
- // try to find a constructor matching the exact number of children
- Try(funcClass.getDeclaredConstructor(childrenClass: _*)) match {
- case Success(ctor) =>
- Try(ctor.newInstance(children: _*).asInstanceOf[Expression]) match {
- case Success(expr) => expr
- case Failure(exception) => throw ValidationException(exception.getMessage)
- }
- case Failure(exception) =>
- throw ValidationException(s"Invalid number of arguments for function $funcClass")
- }
- }
-
- case _ =>
- throw ValidationException("Unsupported function.")
- }
- }
-
- /**
- * Drop a function and return if the function existed.
- */
- def dropFunction(name: String): Boolean =
- functionBuilders.remove(name.toLowerCase).isDefined
-
- /**
- * Drop all registered functions.
- */
- def clear(): Unit = functionBuilders.clear()
-}
-
-object FunctionCatalog {
-
- val builtInFunctions: Map[String, Class[_]] = Map(
- // logic
- "isNull" -> classOf[IsNull],
- "isNotNull" -> classOf[IsNotNull],
- "isTrue" -> classOf[IsTrue],
- "isFalse" -> classOf[IsFalse],
- "isNotTrue" -> classOf[IsNotTrue],
- "isNotFalse" -> classOf[IsNotFalse],
-
- // aggregate functions
- "avg" -> classOf[Avg],
- "count" -> classOf[Count],
- "max" -> classOf[Max],
- "min" -> classOf[Min],
- "sum" -> classOf[Sum],
-
- // string functions
- "charLength" -> classOf[CharLength],
- "initCap" -> classOf[InitCap],
- "like" -> classOf[Like],
- "lowerCase" -> classOf[Lower],
- "similar" -> classOf[Similar],
- "substring" -> classOf[Substring],
- "trim" -> classOf[Trim],
- "upperCase" -> classOf[Upper],
- "position" -> classOf[Position],
- "overlay" -> classOf[Overlay],
-
- // math functions
- "abs" -> classOf[Abs],
- "ceil" -> classOf[Ceil],
- "exp" -> classOf[Exp],
- "floor" -> classOf[Floor],
- "log10" -> classOf[Log10],
- "ln" -> classOf[Ln],
- "power" -> classOf[Power],
- "mod" -> classOf[Mod],
- "sqrt" -> classOf[Sqrt],
-
- // temporal functions
- "extract" -> classOf[Extract],
- "currentDate" -> classOf[CurrentDate],
- "currentTime" -> classOf[CurrentTime],
- "currentTimestamp" -> classOf[CurrentTimestamp],
- "localTime" -> classOf[LocalTime],
- "localTimestamp" -> classOf[LocalTimestamp],
- "quarter" -> classOf[Quarter],
- "temporalOverlaps" -> classOf[TemporalOverlaps],
-
- // array
- "cardinality" -> classOf[ArrayCardinality],
- "at" -> classOf[ArrayElementAt],
- "element" -> classOf[ArrayElement]
-
- // TODO implement function overloading here
- // "floor" -> classOf[TemporalFloor]
- // "ceil" -> classOf[TemporalCeil]
- )
-
- /**
- * Create a new function catalog with built-in functions.
- */
- def withBuiltIns: FunctionCatalog = {
- val catalog = new FunctionCatalog()
- builtInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) }
- catalog
- }
-}
-
-class BasicOperatorTable extends ReflectiveSqlOperatorTable {
-
- /**
- * List of supported SQL operators / functions.
- *
- * This list should be kept in sync with [[SqlStdOperatorTable]].
- */
- private val builtInSqlOperators: Seq[SqlOperator] = Seq(
- // SET OPERATORS
- SqlStdOperatorTable.UNION,
- SqlStdOperatorTable.UNION_ALL,
- SqlStdOperatorTable.EXCEPT,
- SqlStdOperatorTable.EXCEPT_ALL,
- SqlStdOperatorTable.INTERSECT,
- SqlStdOperatorTable.INTERSECT_ALL,
- // BINARY OPERATORS
- SqlStdOperatorTable.AND,
- SqlStdOperatorTable.AS,
- SqlStdOperatorTable.CONCAT,
- SqlStdOperatorTable.DIVIDE,
- SqlStdOperatorTable.DIVIDE_INTEGER,
- SqlStdOperatorTable.DOT,
- SqlStdOperatorTable.EQUALS,
- SqlStdOperatorTable.GREATER_THAN,
- SqlStdOperatorTable.IS_DISTINCT_FROM,
- SqlStdOperatorTable.IS_NOT_DISTINCT_FROM,
- SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
- SqlStdOperatorTable.LESS_THAN,
- SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
- SqlStdOperatorTable.MINUS,
- SqlStdOperatorTable.MULTIPLY,
- SqlStdOperatorTable.NOT_EQUALS,
- SqlStdOperatorTable.OR,
- SqlStdOperatorTable.PLUS,
- SqlStdOperatorTable.DATETIME_PLUS,
- // POSTFIX OPERATORS
- SqlStdOperatorTable.DESC,
- SqlStdOperatorTable.NULLS_FIRST,
- SqlStdOperatorTable.IS_NOT_NULL,
- SqlStdOperatorTable.IS_NULL,
- SqlStdOperatorTable.IS_NOT_TRUE,
- SqlStdOperatorTable.IS_TRUE,
- SqlStdOperatorTable.IS_NOT_FALSE,
- SqlStdOperatorTable.IS_FALSE,
- SqlStdOperatorTable.IS_NOT_UNKNOWN,
- SqlStdOperatorTable.IS_UNKNOWN,
- // PREFIX OPERATORS
- SqlStdOperatorTable.NOT,
- SqlStdOperatorTable.UNARY_MINUS,
- SqlStdOperatorTable.UNARY_PLUS,
- // AGGREGATE OPERATORS
- SqlStdOperatorTable.SUM,
- SqlStdOperatorTable.COUNT,
- SqlStdOperatorTable.MIN,
- SqlStdOperatorTable.MAX,
- SqlStdOperatorTable.AVG,
- // ARRAY OPERATORS
- SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR,
- SqlStdOperatorTable.ITEM,
- SqlStdOperatorTable.CARDINALITY,
- SqlStdOperatorTable.ELEMENT,
- // SPECIAL OPERATORS
- SqlStdOperatorTable.ROW,
- SqlStdOperatorTable.OVERLAPS,
- SqlStdOperatorTable.LITERAL_CHAIN,
- SqlStdOperatorTable.BETWEEN,
- SqlStdOperatorTable.SYMMETRIC_BETWEEN,
- SqlStdOperatorTable.NOT_BETWEEN,
- SqlStdOperatorTable.SYMMETRIC_NOT_BETWEEN,
- SqlStdOperatorTable.NOT_LIKE,
- SqlStdOperatorTable.LIKE,
- SqlStdOperatorTable.NOT_SIMILAR_TO,
- SqlStdOperatorTable.SIMILAR_TO,
- SqlStdOperatorTable.CASE,
- SqlStdOperatorTable.REINTERPRET,
- SqlStdOperatorTable.EXTRACT_DATE,
- // FUNCTIONS
- SqlStdOperatorTable.SUBSTRING,
- SqlStdOperatorTable.OVERLAY,
- SqlStdOperatorTable.TRIM,
- SqlStdOperatorTable.POSITION,
- SqlStdOperatorTable.CHAR_LENGTH,
- SqlStdOperatorTable.CHARACTER_LENGTH,
- SqlStdOperatorTable.UPPER,
- SqlStdOperatorTable.LOWER,
- SqlStdOperatorTable.INITCAP,
- SqlStdOperatorTable.POWER,
- SqlStdOperatorTable.SQRT,
- SqlStdOperatorTable.MOD,
- SqlStdOperatorTable.LN,
- SqlStdOperatorTable.LOG10,
- SqlStdOperatorTable.ABS,
- SqlStdOperatorTable.EXP,
- SqlStdOperatorTable.NULLIF,
- SqlStdOperatorTable.COALESCE,
- SqlStdOperatorTable.FLOOR,
- SqlStdOperatorTable.CEIL,
- SqlStdOperatorTable.LOCALTIME,
- SqlStdOperatorTable.LOCALTIMESTAMP,
- SqlStdOperatorTable.CURRENT_TIME,
- SqlStdOperatorTable.CURRENT_TIMESTAMP,
- SqlStdOperatorTable.CURRENT_DATE,
- SqlStdOperatorTable.CAST,
- SqlStdOperatorTable.EXTRACT,
- SqlStdOperatorTable.QUARTER,
- SqlStdOperatorTable.SCALAR_QUERY,
- SqlStdOperatorTable.EXISTS
- )
-
- builtInSqlOperators.foreach(register)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala
deleted file mode 100644
index 5cc7d03..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.validate
-
-/**
- * Represents the result of a validation.
- */
-sealed trait ValidationResult {
- def isFailure: Boolean = !isSuccess
- def isSuccess: Boolean
-
- /**
- * Allows constructing a cascade of validation results.
- * The first failure result will be returned.
- */
- def orElse(other: ValidationResult): ValidationResult = {
- if (isSuccess) {
- other
- } else {
- this
- }
- }
-}
-
-/**
- * Represents the successful result of a validation.
- */
-object ValidationSuccess extends ValidationResult {
- val isSuccess: Boolean = true
-}
-
-/**
- * Represents the failing result of a validation,
- * with a error message to show the reason of failure.
- */
-case class ValidationFailure(message: String) extends ValidationResult {
- val isSuccess: Boolean = false
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
deleted file mode 100644
index 5637d7a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
-import org.apache.flink.api.table.plan.logical._
-
-/**
- * A group-window specification.
- *
- * Group-windows group rows based on time or row-count intervals and is therefore essentially a
- * special type of groupBy. Just like groupBy, group-windows allow to compute aggregates
- * on groups of elements.
- *
- * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping
- * is required to apply aggregations on streaming tables.
- *
- * For finite batch tables, group-windows provide shortcuts for time-based groupBy.
- *
- */
-trait GroupWindow {
-
- /**
- * Converts an API class to a logical window for planning.
- */
- private[flink] def toLogicalWindow: LogicalWindow
-}
-
-/**
- * A group-window operating on event-time.
- *
- * @param timeField defines the time mode for streaming tables. For batch table it defines the
- * time attribute on which is grouped.
- */
-abstract class EventTimeWindow(val timeField: Expression) extends GroupWindow {
-
- protected var name: Option[Expression] = None
-
- /**
- * Assigns an alias for this window that the following `select()` clause can refer to in order
- * to access window properties such as window start or end time.
- *
- * @param alias alias for this window
- * @return this window
- */
- def as(alias: Expression): EventTimeWindow = {
- this.name = Some(alias)
- this
- }
-
- /**
- * Assigns an alias for this window that the following `select()` clause can refer to in order
- * to access window properties such as window start or end time.
- *
- * @param alias alias for this window
- * @return this window
- */
- def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias))
-}
-
-// ------------------------------------------------------------------------------------------------
-// Tumbling group-windows
-// ------------------------------------------------------------------------------------------------
-
-/**
- * Tumbling group-window.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
- * grouped by processing-time.
- *
- * @param size the size of the window either as time or row-count interval.
- */
-class TumblingWindow(size: Expression) extends GroupWindow {
-
- /**
- * Tumbling group-window.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
- *
- * @param size the size of the window either as time or row-count interval.
- */
- def this(size: String) = this(ExpressionParser.parseExpression(size))
-
- private var alias: Option[Expression] = None
-
- /**
- * Specifies the time attribute on which rows are grouped.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
- *
- * For batch tables, refer to a timestamp or long attribute.
- *
- * @param timeField time mode for streaming tables and time attribute for batch tables
- * @return a tumbling group-window on event-time
- */
- def on(timeField: Expression): TumblingEventTimeWindow =
- new TumblingEventTimeWindow(alias, timeField, size)
-
- /**
- * Specifies the time attribute on which rows are grouped.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
- *
- * For batch tables, refer to a timestamp or long attribute.
- *
- * @param timeField time mode for streaming tables and time attribute for batch tables
- * @return a tumbling group-window on event-time
- */
- def on(timeField: String): TumblingEventTimeWindow =
- on(ExpressionParser.parseExpression(timeField))
-
- /**
- * Assigns an alias for this window that the following `select()` clause can refer to in order
- * to access window properties such as window start or end time.
- *
- * @param alias alias for this window
- * @return this window
- */
- def as(alias: Expression): TumblingWindow = {
- this.alias = Some(alias)
- this
- }
-
- /**
- * Assigns an alias for this window that the following `select()` clause can refer to in order
- * to access window properties such as window start or end time.
- *
- * @param alias alias for this window
- * @return this window
- */
- def as(alias: String): TumblingWindow = as(ExpressionParser.parseExpression(alias))
-
- override private[flink] def toLogicalWindow: LogicalWindow =
- ProcessingTimeTumblingGroupWindow(alias, size)
-}
-
-/**
- * Tumbling group-window on event-time.
- */
-class TumblingEventTimeWindow(
- alias: Option[Expression],
- time: Expression,
- size: Expression)
- extends EventTimeWindow(time) {
-
- override private[flink] def toLogicalWindow: LogicalWindow =
- EventTimeTumblingGroupWindow(name.orElse(alias), time, size)
-}
-
-// ------------------------------------------------------------------------------------------------
-// Sliding group windows
-// ------------------------------------------------------------------------------------------------
-
-/**
- * Partially specified sliding window.
- *
- * @param size the size of the window either as time or row-count interval.
- */
-class SlideWithSize(size: Expression) {
-
- /**
- * Partially specified sliding window.
- *
- * @param size the size of the window either as time or row-count interval.
- */
- def this(size: String) = this(ExpressionParser.parseExpression(size))
-
- /**
- * Specifies the window's slide as time or row-count interval.
- *
- * The slide determines the interval in which windows are started. Hence, sliding windows can
- * overlap if the slide is smaller than the size of the window.
- *
- * For example, you could have windows of size 15 minutes that slide by 3 minutes. With this
- * 15 minutes worth of elements are grouped every 3 minutes and each row contributes to 5
- * windows.
- *
- * @param slide the slide of the window either as time or row-count interval.
- * @return a sliding group-window
- */
- def every(slide: Expression): SlidingWindow = new SlidingWindow(size, slide)
-
- /**
- * Specifies the window's slide as time or row-count interval.
- *
- * The slide determines the interval in which windows are started. Hence, sliding windows can
- * overlap if the slide is smaller than the size of the window.
- *
- * For example, you could have windows of size 15 minutes that slide by 3 minutes. With this
- * 15 minutes worth of elements are grouped every 3 minutes and each row contributes to 5
- * windows.
- *
- * @param slide the slide of the window either as time or row-count interval.
- * @return a sliding group-window
- */
- def every(slide: String): SlidingWindow = every(ExpressionParser.parseExpression(slide))
-}
-
-/**
- * Sliding group-window.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
- * grouped by processing-time.
- *
- * @param size the size of the window either as time or row-count interval.
- */
-class SlidingWindow(
- size: Expression,
- slide: Expression)
- extends GroupWindow {
-
- private var alias: Option[Expression] = None
-
- /**
- * Specifies the time attribute on which rows are grouped.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
- *
- * For batch tables, refer to a timestamp or long attribute.
- *
- * @param timeField time mode for streaming tables and time attribute for batch tables
- * @return a sliding group-window on event-time
- */
- def on(timeField: Expression): SlidingEventTimeWindow =
- new SlidingEventTimeWindow(alias, timeField, size, slide)
-
- /**
- * Specifies the time attribute on which rows are grouped.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
- *
- * For batch tables, refer to a timestamp or long attribute.
- *
- * @param timeField time mode for streaming tables and time attribute for batch tables
- * @return a sliding group-window on event-time
- */
- def on(timeField: String): SlidingEventTimeWindow =
- on(ExpressionParser.parseExpression(timeField))
-
- /**
- * Assigns an alias for this window that the following `select()` clause can refer to in order
- * to access window properties such as window start or end time.
- *
- * @param alias alias for this window
- * @return this window
- */
- def as(alias: Expression): SlidingWindow = {
- this.alias = Some(alias)
- this
- }
-
- /**
- * Assigns an alias for this window that the following `select()` clause can refer to in order
- * to access window properties such as window start or end time.
- *
- * @param alias alias for this window
- * @return this window
- */
- def as(alias: String): SlidingWindow = as(ExpressionParser.parseExpression(alias))
-
- override private[flink] def toLogicalWindow: LogicalWindow =
- ProcessingTimeSlidingGroupWindow(alias, size, slide)
-}
-
-/**
- * Sliding group-window on event-time.
- */
-class SlidingEventTimeWindow(
- alias: Option[Expression],
- timeField: Expression,
- size: Expression,
- slide: Expression)
- extends EventTimeWindow(timeField) {
-
- override private[flink] def toLogicalWindow: LogicalWindow =
- EventTimeSlidingGroupWindow(name.orElse(alias), timeField, size, slide)
-}
-
-// ------------------------------------------------------------------------------------------------
-// Session group windows
-// ------------------------------------------------------------------------------------------------
-
-/**
- * Session group-window.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
- * grouped by processing-time.
- *
- * @param gap the time interval of inactivity before a window is closed.
- */
-class SessionWindow(gap: Expression) extends GroupWindow {
-
- /**
- * Session group-window.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
- *
- * @param gap the time interval of inactivity before a window is closed.
- */
- def this(gap: String) = this(ExpressionParser.parseExpression(gap))
-
- private var alias: Option[Expression] = None
-
- /**
- * Specifies the time attribute on which rows are grouped.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
- *
- * For batch tables, refer to a timestamp or long attribute.
- *
- * @param timeField time mode for streaming tables and time attribute for batch tables
- * @return a session group-window on event-time
- */
- def on(timeField: Expression): SessionEventTimeWindow =
- new SessionEventTimeWindow(alias, timeField, gap)
-
- /**
- * Specifies the time attribute on which rows are grouped.
- *
- * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
- * are grouped by processing-time.
- *
- * For batch tables, refer to a timestamp or long attribute.
- *
- * @param timeField time mode for streaming tables and time attribute for batch tables
- * @return a session group-window on event-time
- */
- def on(timeField: String): SessionEventTimeWindow =
- on(ExpressionParser.parseExpression(timeField))
-
- /**
- * Assigns an alias for this window that the following `select()` clause can refer to in order
- * to access window properties such as window start or end time.
- *
- * @param alias alias for this window
- * @return this window
- */
- def as(alias: Expression): SessionWindow = {
- this.alias = Some(alias)
- this
- }
-
- /**
- * Assigns an alias for this window that the following `select()` clause can refer to in order
- * to access window properties such as window start or end time.
- *
- * @param alias alias for this window
- * @return this window
- */
- def as(alias: String): SessionWindow = as(ExpressionParser.parseExpression(alias))
-
- override private[flink] def toLogicalWindow: LogicalWindow =
- ProcessingTimeSessionGroupWindow(alias, gap)
-}
-
-/**
- * Session group-window on event-time.
- */
-class SessionEventTimeWindow(
- alias: Option[Expression],
- timeField: Expression,
- gap: Expression)
- extends EventTimeWindow(timeField) {
-
- override private[flink] def toLogicalWindow: LogicalWindow =
- EventTimeSessionGroupWindow(name.orElse(alias), timeField, gap)
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
deleted file mode 100644
index 63a5413..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-
-/**
- * Simple example for demonstrating the use of SQL on a Stream Table.
- *
- * This example shows how to:
- * - Convert DataStreams to Tables
- * - Register a Table under a name
- * - Run a StreamSQL query on the registered Table
- *
- */
-object StreamSQLExample {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- def main(args: Array[String]): Unit = {
-
- // set up execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val orderA: DataStream[Order] = env.fromCollection(Seq(
- Order(1L, "beer", 3),
- Order(1L, "diaper", 4),
- Order(3L, "rubber", 2)))
-
- val orderB: DataStream[Order] = env.fromCollection(Seq(
- Order(2L, "pen", 3),
- Order(2L, "rubber", 3),
- Order(4L, "beer", 1)))
-
- // register the DataStreams under the name "OrderA" and "OrderB"
- tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
- tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
-
- // union the two tables
- val result = tEnv.sql(
- "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
- "SELECT * FROM OrderB WHERE amount < 2")
-
- result.toDataStream[Order].print()
-
- env.execute()
- }
-
- // *************************************************************************
- // USER DATA TYPES
- // *************************************************************************
-
- case class Order(user: Long, product: String, amount: Int)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
deleted file mode 100644
index 2ce2684..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-
-/**
- * Simple example for demonstrating the use of Table API on a Stream Table.
- *
- * This example shows how to:
- * - Convert DataStreams to Tables
- * - Apply union, select, and filter operations
- *
- */
-object StreamTableExample {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- def main(args: Array[String]): Unit = {
-
- // set up execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val orderA = env.fromCollection(Seq(
- Order(1L, "beer", 3),
- Order(1L, "diaper", 4),
- Order(3L, "rubber", 2))).toTable(tEnv)
-
- val orderB = env.fromCollection(Seq(
- Order(2L, "pen", 3),
- Order(2L, "rubber", 3),
- Order(4L, "beer", 1))).toTable(tEnv)
-
- // union the two tables
- val result: DataStream[Order] = orderA.unionAll(orderB)
- .select('user, 'product, 'amount)
- .where('amount > 2)
- .toDataStream[Order]
-
- result.print()
-
- env.execute()
- }
-
- // *************************************************************************
- // USER DATA TYPES
- // *************************************************************************
-
- case class Order(user: Long, product: String, amount: Int)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
deleted file mode 100644
index a950988..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-
-/**
- * This program implements a modified version of the TPC-H query 3. The
- * example demonstrates how to assign names to fields by extending the Tuple class.
- * The original query can be found at
- * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
- * (page 29).
- *
- * This program implements the following SQL equivalent:
- *
- * {{{
- * SELECT
- * l_orderkey,
- * SUM(l_extendedprice*(1-l_discount)) AS revenue,
- * o_orderdate,
- * o_shippriority
- * FROM customer,
- * orders,
- * lineitem
- * WHERE
- * c_mktsegment = '[SEGMENT]'
- * AND c_custkey = o_custkey
- * AND l_orderkey = o_orderkey
- * AND o_orderdate < date '[DATE]'
- * AND l_shipdate > date '[DATE]'
- * GROUP BY
- * l_orderkey,
- * o_orderdate,
- * o_shippriority
- * ORDER BY
- * revenue desc,
- * o_orderdate;
- * }}}
- *
- * Input files are plain text CSV files using the pipe character ('|') as field separator
- * as generated by the TPC-H data generator which is available at
- * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
- *
- * Usage:
- * {{{
- * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
- * }}}
- *
- * This example shows how to:
- * - Convert DataSets to Tables
- * - Use Table API expressions
- *
- */
-object TPCHQuery3Table {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- def main(args: Array[String]) {
- if (!parseParameters(args)) {
- return
- }
-
- // set filter date
- val date = "1995-03-12".toDate
-
- // get execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val lineitems = getLineitemDataSet(env)
- .toTable(tEnv, 'id, 'extdPrice, 'discount, 'shipDate)
- .filter('shipDate.toDate > date)
-
- val customers = getCustomerDataSet(env)
- .toTable(tEnv, 'id, 'mktSegment)
- .filter('mktSegment === "AUTOMOBILE")
-
- val orders = getOrdersDataSet(env)
- .toTable(tEnv, 'orderId, 'custId, 'orderDate, 'shipPrio)
- .filter('orderDate.toDate < date)
-
- val items =
- orders.join(customers)
- .where('custId === 'id)
- .select('orderId, 'orderDate, 'shipPrio)
- .join(lineitems)
- .where('orderId === 'id)
- .select(
- 'orderId,
- 'extdPrice * (1.0f.toExpr - 'discount) as 'revenue,
- 'orderDate,
- 'shipPrio)
-
- val result = items
- .groupBy('orderId, 'orderDate, 'shipPrio)
- .select('orderId, 'revenue.sum as 'revenue, 'orderDate, 'shipPrio)
- .orderBy('revenue.desc, 'orderDate.asc)
-
- // emit result
- result.writeAsCsv(outputPath, "\n", "|")
-
- // execute program
- env.execute("Scala TPCH Query 3 (Table API Expression) Example")
- }
-
- // *************************************************************************
- // USER DATA TYPES
- // *************************************************************************
-
- case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String)
- case class Customer(id: Long, mktSegment: String)
- case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private var lineitemPath: String = _
- private var customerPath: String = _
- private var ordersPath: String = _
- private var outputPath: String = _
-
- private def parseParameters(args: Array[String]): Boolean = {
- if (args.length == 4) {
- lineitemPath = args(0)
- customerPath = args(1)
- ordersPath = args(2)
- outputPath = args(3)
- true
- } else {
- System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
- " Due to legal restrictions, we can not ship generated data.\n" +
- " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
- " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> " +
- "<orders-csv path> <result path>")
- false
- }
- }
-
- private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
- env.readCsvFile[Lineitem](
- lineitemPath,
- fieldDelimiter = "|",
- includedFields = Array(0, 5, 6, 10) )
- }
-
- private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
- env.readCsvFile[Customer](
- customerPath,
- fieldDelimiter = "|",
- includedFields = Array(0, 6) )
- }
-
- private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
- env.readCsvFile[Order](
- ordersPath,
- fieldDelimiter = "|",
- includedFields = Array(0, 1, 4, 7) )
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
deleted file mode 100644
index 96a603e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-
-/**
- * Simple example that shows how the Batch SQL API is used in Scala.
- *
- * This example shows how to:
- * - Convert DataSets to Tables
- * - Register a Table under a name
- * - Run a SQL query on the registered Table
- *
- */
-object WordCountSQL {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- def main(args: Array[String]): Unit = {
-
- // set up execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
-
- // register the DataSet as table "WordCount"
- tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
-
- // run a SQL query on the Table and retrieve the result as a new Table
- val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
-
- table.toDataSet[WC].print()
- }
-
- // *************************************************************************
- // USER DATA TYPES
- // *************************************************************************
-
- case class WC(word: String, frequency: Long)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
deleted file mode 100644
index 587a716..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-
-/**
- * Simple example for demonstrating the use of the Table API for a Word Count in Scala.
- *
- * This example shows how to:
- * - Convert DataSets to Tables
- * - Apply group, aggregate, select, and filter operations
- *
- */
-object WordCountTable {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- def main(args: Array[String]): Unit = {
-
- // set up execution environment
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
- val expr = input.toTable(tEnv)
- val result = expr
- .groupBy('word)
- .select('word, 'frequency.sum as 'frequency)
- .filter('frequency === 2)
- .toDataSet[WC]
-
- result.print()
- }
-
- // *************************************************************************
- // USER DATA TYPES
- // *************************************************************************
-
- case class WC(word: String, frequency: Long)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
new file mode 100644
index 0000000..59cad80
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.sql2rel.RelDecorrelator
+import org.apache.calcite.tools.{Programs, RuleSet}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.io.DiscardingOutputFormat
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.table.calcite.FlinkPlannerImpl
+import org.apache.flink.table.explain.PlanJsonParser
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
+import org.apache.flink.table.plan.rules.FlinkRuleSets
+import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
+import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
+import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.types.Row
+
+/**
+ * The abstract base class for batch TableEnvironments.
+ *
+ * A TableEnvironment can be used to:
+ * - convert a [[DataSet]] to a [[Table]]
+ * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog
+ * - register a [[Table]] in the [[TableEnvironment]]'s catalog
+ * - scan a registered table to obtain a [[Table]]
+ * - specify a SQL query on registered tables to obtain a [[Table]]
+ * - convert a [[Table]] into a [[DataSet]]
+ * - explain the AST and execution plan of a [[Table]]
+ *
+ * @param execEnv The [[ExecutionEnvironment]] which is wrapped in this [[BatchTableEnvironment]].
+ * @param config The [[TableConfig]] of this [[BatchTableEnvironment]].
+ */
+abstract class BatchTableEnvironment(
+ private[flink] val execEnv: ExecutionEnvironment,
+ config: TableConfig)
+ extends TableEnvironment(config) {
+
+ // a counter for unique table names.
+ private val nameCntr: AtomicInteger = new AtomicInteger(0)
+
+ // the naming pattern for internally registered tables.
+ private val internalNamePattern = "^_DataSetTable_[0-9]+$".r
+
+ /**
+ * Checks if the chosen table name is valid.
+ *
+ * @param name The table name to check.
+ */
+ override protected def checkValidTableName(name: String): Unit = {
+ val m = internalNamePattern.findFirstIn(name)
+ m match {
+ case Some(_) =>
+ throw new TableException(s"Illegal Table name. " +
+ s"Please choose a name that does not contain the pattern $internalNamePattern")
+ case None =>
+ }
+ }
+
+ /** Returns a unique table name according to the internal naming pattern. */
+ protected def createUniqueTableName(): String = "_DataSetTable_" + nameCntr.getAndIncrement()
+
+ /**
+ * Scans a registered table and returns the resulting [[Table]].
+ *
+ * The table to scan must be registered in the [[TableEnvironment]]'s catalog.
+ *
+ * @param tableName The name of the table to scan.
+ * @throws ValidationException if no table is registered under the given name.
+ * @return The scanned table.
+ */
+ @throws[ValidationException]
+ def scan(tableName: String): Table = {
+ if (isRegistered(tableName)) {
+ new Table(this, CatalogNode(tableName, getRowType(tableName)))
+ } else {
+ throw new TableException(s"Table \'$tableName\' was not found in the registry.")
+ }
+ }
+
+ /**
+ * Registers an external [[BatchTableSource]] in this [[TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * @param name The name under which the [[BatchTableSource]] is registered.
+ * @param tableSource The [[BatchTableSource]] to register.
+ */
+ def registerTableSource(name: String, tableSource: BatchTableSource[_]): Unit = {
+
+ checkValidTableName(name)
+ registerTableInternal(name, new TableSourceTable(tableSource))
+ }
+
+ /**
+ * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
+ *
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ *
+ * @param query The SQL query to evaluate.
+ * @return The result of the query as Table.
+ */
+ override def sql(query: String): Table = {
+
+ val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
+ // parse the sql query
+ val parsed = planner.parse(query)
+ // validate the sql query
+ val validated = planner.validate(parsed)
+ // transform to a relational tree
+ val relational = planner.rel(validated)
+
+ new Table(this, LogicalRelNode(relational.rel))
+ }
+
+ /**
+ * Writes a [[Table]] to a [[TableSink]].
+ *
+ * Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the
+ * [[TableSink]] to write it.
+ *
+ * @param table The [[Table]] to write.
+ * @param sink The [[TableSink]] to write the [[Table]] to.
+ * @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
+ */
+ override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
+
+ sink match {
+ case batchSink: BatchTableSink[T] =>
+ val outputType = sink.getOutputType
+ // translate the Table into a DataSet and provide the type that the TableSink expects.
+ val result: DataSet[T] = translate(table)(outputType)
+ // Give the DataSet to the TableSink to emit it.
+ batchSink.emitDataSet(result)
+ case _ =>
+ throw new TableException("BatchTableSink required to emit batch Table")
+ }
+ }
+
+ /**
+ * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
+ * the result of the given [[Table]].
+ *
+ * @param table The table for which the AST and execution plan will be returned.
+ * @param extended Flag to include detailed optimizer estimates.
+ */
+ private[flink] def explain(table: Table, extended: Boolean): String = {
+ val ast = table.getRelNode
+ val optimizedPlan = optimize(ast)
+ val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
+ dataSet.output(new DiscardingOutputFormat[Row])
+ val env = dataSet.getExecutionEnvironment
+ val jasonSqlPlan = env.getExecutionPlan
+ val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
+
+ s"== Abstract Syntax Tree ==" +
+ System.lineSeparator +
+ s"${RelOptUtil.toString(ast)}" +
+ System.lineSeparator +
+ s"== Optimized Logical Plan ==" +
+ System.lineSeparator +
+ s"${RelOptUtil.toString(optimizedPlan)}" +
+ System.lineSeparator +
+ s"== Physical Execution Plan ==" +
+ System.lineSeparator +
+ s"$sqlPlan"
+ }
+
+ /**
+ * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
+ * the result of the given [[Table]].
+ *
+ * @param table The table for which the AST and execution plan will be returned.
+ */
+ def explain(table: Table): String = explain(table: Table, extended = false)
+
+ /**
+ * Registers a [[DataSet]] as a table under a given name in the [[TableEnvironment]]'s catalog.
+ *
+ * @param name The name under which the table is registered in the catalog.
+ * @param dataSet The [[DataSet]] to register as table in the catalog.
+ * @tparam T the type of the [[DataSet]].
+ */
+ protected def registerDataSetInternal[T](name: String, dataSet: DataSet[T]): Unit = {
+
+ val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType)
+ val dataSetTable = new DataSetTable[T](
+ dataSet,
+ fieldIndexes,
+ fieldNames
+ )
+ registerTableInternal(name, dataSetTable)
+ }
+
+ /**
+ * Registers a [[DataSet]] as a table under a given name with field names as specified by
+ * field expressions in the [[TableEnvironment]]'s catalog.
+ *
+ * @param name The name under which the table is registered in the catalog.
+ * @param dataSet The [[DataSet]] to register as table in the catalog.
+ * @param fields The field expressions to define the field names of the table.
+ * @tparam T The type of the [[DataSet]].
+ */
+ protected def registerDataSetInternal[T](
+ name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
+
+ val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields)
+ val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
+ registerTableInternal(name, dataSetTable)
+ }
+
+ /**
+ * Returns the built-in rules that are defined by the environment.
+ */
+ protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASET_OPT_RULES
+
+ /**
+ * Generates the optimized [[RelNode]] tree from the original relational node tree.
+ *
+ * @param relNode The original [[RelNode]] tree
+ * @return The optimized [[RelNode]] tree
+ */
+ private[flink] def optimize(relNode: RelNode): RelNode = {
+
+ // decorrelate
+ val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
+
+ // optimize the logical Flink plan
+ val optProgram = Programs.ofRules(getRuleSet)
+ val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
+
+ val dataSetPlan = try {
+ optProgram.run(getPlanner, decorPlan, flinkOutputProps)
+ } catch {
+ case e: CannotPlanException =>
+ throw new TableException(
+ s"Cannot generate a valid execution plan for the given query: \n\n" +
+ s"${RelOptUtil.toString(relNode)}\n" +
+ s"This exception indicates that the query uses an unsupported SQL feature.\n" +
+ s"Please check the documentation for the set of currently supported SQL features.")
+ case t: TableException =>
+ throw new TableException(
+ s"Cannot generate a valid execution plan for the given query: \n\n" +
+ s"${RelOptUtil.toString(relNode)}\n" +
+ s"${t.msg}\n" +
+ s"Please check the documentation for the set of currently supported SQL features.")
+ case a: AssertionError =>
+ throw a.getCause
+ }
+ dataSetPlan
+ }
+
+ /**
+ * Translates a [[Table]] into a [[DataSet]].
+ *
+ * The transformation involves optimizing the relational expression tree as defined by
+ * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
+ *
+ * @param table The root node of the relational expression tree.
+ * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
+ * @tparam A The type of the resulting [[DataSet]].
+ * @return The [[DataSet]] that corresponds to the translated [[Table]].
+ */
+ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+ val dataSetPlan = optimize(table.getRelNode)
+ translate(dataSetPlan)
+ }
+
+ /**
+ * Translates a logical [[RelNode]] into a [[DataSet]].
+ *
+ * @param logicalPlan The root node of the relational expression tree.
+ * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
+ * @tparam A The type of the resulting [[DataSet]].
+ * @return The [[DataSet]] that corresponds to the translated [[Table]].
+ */
+ protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+ validateType(tpe)
+
+ logicalPlan match {
+ case node: DataSetRel =>
+ node.translateToPlan(
+ this,
+ Some(tpe.asInstanceOf[TypeInformation[Any]])
+ ).asInstanceOf[DataSet[A]]
+ case _ => ???
+ }
+ }
+}