You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:55 UTC
[26/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
new file mode 100644
index 0000000..9c6eea8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import java.util
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.plan.RelOptTable.ViewExpander
+import org.apache.calcite.plan._
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.RelRoot
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.RexBuilder
+import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.sql.parser.{SqlParser, SqlParseException => CSqlParseException}
+import org.apache.calcite.sql.validate.SqlValidator
+import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter}
+import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
+import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException}
+
+import scala.collection.JavaConversions._
+
+/**
+ * NOTE: this is heavily inspired by Calcite's PlannerImpl.
+ * We need it in order to share the planner between the Table API relational plans
+ * and the SQL relation plans that are created by the Calcite parser.
+ * The main difference is that we do not create a new RelOptPlanner in the ready() method.
+ */
+class FlinkPlannerImpl(
+ config: FrameworkConfig,
+ planner: RelOptPlanner,
+ typeFactory: FlinkTypeFactory) {
+
+ val operatorTable: SqlOperatorTable = config.getOperatorTable
+ /** Holds the trait definitions to be registered with planner. May be null. */
+ val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs
+ val parserConfig: SqlParser.Config = config.getParserConfig
+ val convertletTable: SqlRexConvertletTable = config.getConvertletTable
+ val defaultSchema: SchemaPlus = config.getDefaultSchema
+
+ var validator: FlinkCalciteSqlValidator = _
+ var validatedSqlNode: SqlNode = _
+ var root: RelRoot = _
+
+ private def ready() {
+ if (this.traitDefs != null) {
+ planner.clearRelTraitDefs()
+ for (traitDef <- this.traitDefs) {
+ planner.addRelTraitDef(traitDef)
+ }
+ }
+ }
+
+ def parse(sql: String): SqlNode = {
+ try {
+ ready()
+ val parser: SqlParser = SqlParser.create(sql, parserConfig)
+ val sqlNode: SqlNode = parser.parseStmt
+ sqlNode
+ } catch {
+ case e: CSqlParseException =>
+ throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
+ }
+ }
+
+ def validate(sqlNode: SqlNode): SqlNode = {
+ validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory)
+ validator.setIdentifierExpansion(true)
+ try {
+ validatedSqlNode = validator.validate(sqlNode)
+ }
+ catch {
+ case e: RuntimeException =>
+ throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
+ }
+ validatedSqlNode
+ }
+
+ def rel(sql: SqlNode): RelRoot = {
+ try {
+ assert(validatedSqlNode != null)
+ val rexBuilder: RexBuilder = createRexBuilder
+ val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+ val config = SqlToRelConverter.configBuilder()
+ .withTrimUnusedFields(false).withConvertTableAccess(false).build()
+ val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
+ new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config)
+ root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
+ // we disable automatic flattening in order to let composite types pass without modification
+ // we might enable it again once Calcite has better support for structured types
+ // root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
+ root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+ root
+ } catch {
+ case e: RelConversionException => throw TableException(e.getMessage)
+ }
+ }
+
+ /** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]]
+ * interface for [[org.apache.calcite.tools.Planner]]. */
+ class ViewExpanderImpl extends ViewExpander {
+
+ override def expandView(
+ rowType: RelDataType,
+ queryString: String,
+ schemaPath: util.List[String],
+ viewPath: util.List[String]): RelRoot = {
+
+ val parser: SqlParser = SqlParser.create(queryString, parserConfig)
+ var sqlNode: SqlNode = null
+ try {
+ sqlNode = parser.parseQuery
+ }
+ catch {
+ case e: CSqlParseException =>
+ throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
+ }
+ val catalogReader: CalciteCatalogReader = createCatalogReader.withSchemaPath(schemaPath)
+ val validator: SqlValidator =
+ new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory)
+ validator.setIdentifierExpansion(true)
+ val validatedSqlNode: SqlNode = validator.validate(sqlNode)
+ val rexBuilder: RexBuilder = createRexBuilder
+ val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+ val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder
+ .withTrimUnusedFields(false).withConvertTableAccess(false).build
+ val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
+ new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable, config)
+ root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
+ root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
+ root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+ FlinkPlannerImpl.this.root
+ }
+ }
+
+ private def createCatalogReader: CalciteCatalogReader = {
+ val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema)
+ new CalciteCatalogReader(
+ CalciteSchema.from(rootSchema),
+ parserConfig.caseSensitive,
+ CalciteSchema.from(defaultSchema).path(null),
+ typeFactory)
+ }
+
+ private def createRexBuilder: RexBuilder = {
+ new RexBuilder(typeFactory)
+ }
+
+}
+
+object FlinkPlannerImpl {
+ private def rootSchema(schema: SchemaPlus): SchemaPlus = {
+ if (schema.getParentSchema == null) {
+ schema
+ }
+ else {
+ rootSchema(schema.getParentSchema)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
new file mode 100644
index 0000000..8465ec6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import java.util.Collections
+
+import org.apache.calcite.plan.volcano.VolcanoPlanner
+import java.lang.Iterable
+
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.plan._
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rex.RexBuilder
+import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
+import org.apache.calcite.tools.{FrameworkConfig, RelBuilder}
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions.WindowProperty
+import org.apache.flink.table.plan.logical.LogicalWindow
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+/**
+ * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
+ */
+class FlinkRelBuilder(
+ context: Context,
+ relOptCluster: RelOptCluster,
+ relOptSchema: RelOptSchema)
+ extends RelBuilder(
+ context,
+ relOptCluster,
+ relOptSchema) {
+
+ def getPlanner: RelOptPlanner = cluster.getPlanner
+
+ def getCluster: RelOptCluster = relOptCluster
+
+ override def getTypeFactory: FlinkTypeFactory =
+ super.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+ def aggregate(
+ window: LogicalWindow,
+ groupKey: GroupKey,
+ namedProperties: Seq[NamedWindowProperty],
+ aggCalls: Iterable[AggCall])
+ : RelBuilder = {
+ // build logical aggregate
+ val aggregate = super.aggregate(groupKey, aggCalls).build().asInstanceOf[LogicalAggregate]
+
+ // build logical window aggregate from it
+ push(LogicalWindowAggregate.create(window, namedProperties, aggregate))
+ this
+ }
+
+}
+
+object FlinkRelBuilder {
+
+ def create(config: FrameworkConfig): FlinkRelBuilder = {
+
+ // create Flink type factory
+ val typeSystem = config.getTypeSystem
+ val typeFactory = new FlinkTypeFactory(typeSystem)
+
+ // create context instances with Flink type factory
+ val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty())
+ planner.setExecutor(config.getExecutor)
+ planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
+ val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
+ val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
+ val relOptSchema = new CalciteCatalogReader(
+ calciteSchema,
+ config.getParserConfig.caseSensitive(),
+ Collections.emptyList(),
+ typeFactory)
+
+ new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
+ }
+
+ /**
+ * Information necessary to create a window aggregate.
+ *
+ * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]].
+ */
+ case class NamedWindowProperty(name: String, property: WindowProperty)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
new file mode 100644
index 0000000..f3e2f91
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.sql.SqlIntervalQualifier
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.ValueTypeInfo._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
+import org.apache.flink.table.plan.schema.ArrayRelDataType
+import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
+
+import scala.collection.mutable
+
+/**
+ * Flink specific type factory that represents the interface between Flink's [[TypeInformation]]
+ * and Calcite's [[RelDataType]].
+ */
+class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) {
+
+ // NOTE: for future data types it might be necessary to
+ // override more methods of RelDataTypeFactoryImpl
+
+ private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
+
+ def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
+ // simple type can be converted to SQL types and vice versa
+ if (isSimple(typeInfo)) {
+ val sqlType = typeInfoToSqlTypeName(typeInfo)
+ sqlType match {
+
+ case INTERVAL_YEAR_MONTH =>
+ createSqlIntervalType(
+ new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
+
+ case INTERVAL_DAY_SECOND =>
+ createSqlIntervalType(
+ new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
+
+ case _ =>
+ createSqlType(sqlType)
+ }
+ }
+ // advanced types require specific RelDataType
+ // for storing the original TypeInformation
+ else {
+ seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo)))
+ }
+ }
+
+ /**
+ * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
+ *
+ * @param fieldNames field names
+ * @param fieldTypes field types, every element is Flink's [[TypeInformation]]
+ * @return a struct type with the input fieldNames and input fieldTypes
+ */
+ def buildRowDataType(
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]])
+ : RelDataType = {
+ val rowDataTypeBuilder = builder
+ fieldNames
+ .zip(fieldTypes)
+ .foreach { f =>
+ rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true)
+ }
+ rowDataTypeBuilder.build
+ }
+
+ override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
+ // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
+ // always set those to default value
+ if (typeName == VARCHAR && precision < 0) {
+ createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
+ } else {
+ super.createSqlType(typeName, precision)
+ }
+ }
+
+ override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType =
+ new ArrayRelDataType(
+ ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
+ elementType,
+ true)
+
+ private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
+ case ct: CompositeType[_] =>
+ new CompositeRelDataType(ct, this)
+
+ case pa: PrimitiveArrayTypeInfo[_] =>
+ new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)
+
+ case oa: ObjectArrayTypeInfo[_, _] =>
+ new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
+
+ case ti: TypeInformation[_] =>
+ new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
+
+ case ti@_ =>
+ throw TableException(s"Unsupported type information: $ti")
+ }
+
+ override def createTypeWithNullability(
+ relDataType: RelDataType,
+ nullable: Boolean)
+ : RelDataType = relDataType match {
+ case composite: CompositeRelDataType =>
+ // at the moment we do not care about nullability
+ composite
+ case _ =>
+ super.createTypeWithNullability(relDataType, nullable)
+ }
+}
+
+object FlinkTypeFactory {
+
+ private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match {
+ case BOOLEAN_TYPE_INFO => BOOLEAN
+ case BYTE_TYPE_INFO => TINYINT
+ case SHORT_TYPE_INFO => SMALLINT
+ case INT_TYPE_INFO => INTEGER
+ case LONG_TYPE_INFO => BIGINT
+ case FLOAT_TYPE_INFO => FLOAT
+ case DOUBLE_TYPE_INFO => DOUBLE
+ case STRING_TYPE_INFO => VARCHAR
+ case BIG_DEC_TYPE_INFO => DECIMAL
+
+ // temporal types
+ case SqlTimeTypeInfo.DATE => DATE
+ case SqlTimeTypeInfo.TIME => TIME
+ case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
+ case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
+ case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
+
+ case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
+ throw TableException("Character type is not supported.")
+
+ case _@t =>
+ throw TableException(s"Type is not supported: $t")
+ }
+
+ def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match {
+ case BOOLEAN => BOOLEAN_TYPE_INFO
+ case TINYINT => BYTE_TYPE_INFO
+ case SMALLINT => SHORT_TYPE_INFO
+ case INTEGER => INT_TYPE_INFO
+ case BIGINT => LONG_TYPE_INFO
+ case FLOAT => FLOAT_TYPE_INFO
+ case DOUBLE => DOUBLE_TYPE_INFO
+ case VARCHAR | CHAR => STRING_TYPE_INFO
+ case DECIMAL => BIG_DEC_TYPE_INFO
+
+ // temporal types
+ case DATE => SqlTimeTypeInfo.DATE
+ case TIME => SqlTimeTypeInfo.TIME
+ case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
+ case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MONTHS
+ case typeName if DAY_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+ case NULL =>
+ throw TableException("Type NULL is not supported. Null values must have a supported type.")
+
+ // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
+ // are represented as integer
+ case SYMBOL => INT_TYPE_INFO
+
+ // extract encapsulated TypeInformation
+ case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
+ val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
+ genericRelDataType.typeInfo
+
+ case ROW if relDataType.isInstanceOf[CompositeRelDataType] =>
+ val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType]
+ compositeRelDataType.compositeType
+
+ // ROW and CURSOR for UDTF case, whose type info will never be used, just a placeholder
+ case ROW | CURSOR => new NothingTypeInfo
+
+ case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
+ val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
+ arrayRelDataType.typeInfo
+
+ case _@t =>
+ throw TableException(s"Type is not supported: $t")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala
new file mode 100644
index 0000000..5935297
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl
+import org.apache.calcite.sql.`type`.SqlTypeName
+
+/**
+ * Custom type system for Flink.
+ */
+class FlinkTypeSystem extends RelDataTypeSystemImpl {
+
+ // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic
+ // half should be enough for all use cases
+ override def getMaxNumericScale: Int = Int.MaxValue / 2
+
+ // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic
+ // half should be enough for all use cases
+ override def getMaxNumericPrecision: Int = Int.MaxValue / 2
+
+ override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName match {
+
+ // by default all VARCHARs can have the Java default length
+ case SqlTypeName.VARCHAR =>
+ Int.MaxValue
+
+ // we currenty support only timestamps with milliseconds precision
+ case SqlTypeName.TIMESTAMP =>
+ 3
+
+ case _ =>
+ super.getDefaultPrecision(typeName)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala
new file mode 100644
index 0000000..1f2e9a9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+/**
+ * Exception for all errors occurring during code generation.
+ */
+class CodeGenException(msg: String) extends RuntimeException(msg)
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
new file mode 100644
index 0000000..f8885a2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import java.lang.reflect.{Field, Method}
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
+import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils}
+
+object CodeGenUtils {
+
+ private val nameCounter = new AtomicInteger
+
+ def newName(name: String): String = {
+ s"$name$$${nameCounter.getAndIncrement}"
+ }
+
+ // when casting we first need to unbox Primitives, for example,
+ // float a = 1.0f;
+ // byte b = (byte) a;
+ // works, but for boxed types we need this:
+ // Float a = 1.0f;
+ // Byte b = (byte)(float) a;
+ def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
+ case INT_TYPE_INFO => "int"
+ case LONG_TYPE_INFO => "long"
+ case SHORT_TYPE_INFO => "short"
+ case BYTE_TYPE_INFO => "byte"
+ case FLOAT_TYPE_INFO => "float"
+ case DOUBLE_TYPE_INFO => "double"
+ case BOOLEAN_TYPE_INFO => "boolean"
+ case CHAR_TYPE_INFO => "char"
+
+ // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
+ // does not seem to like this, so we manually give the correct type here.
+ case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+ case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+ case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+ case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+ case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+ case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+ case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+ case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+ // internal primitive representation of time points
+ case SqlTimeTypeInfo.DATE => "int"
+ case SqlTimeTypeInfo.TIME => "int"
+ case SqlTimeTypeInfo.TIMESTAMP => "long"
+
+ // internal primitive representation of time intervals
+ case TimeIntervalTypeInfo.INTERVAL_MONTHS => "int"
+ case TimeIntervalTypeInfo.INTERVAL_MILLIS => "long"
+
+ case _ =>
+ tpe.getTypeClass.getCanonicalName
+ }
+
+ def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
+ // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
+ // does not seem to like this, so we manually give the correct type here.
+ case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+ case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+ case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+ case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+ case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+ case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+ case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+ case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+ case _ =>
+ tpe.getTypeClass.getCanonicalName
+ }
+
+ def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match {
+ case INT_TYPE_INFO => "-1"
+ case LONG_TYPE_INFO => "-1L"
+ case SHORT_TYPE_INFO => "-1"
+ case BYTE_TYPE_INFO => "-1"
+ case FLOAT_TYPE_INFO => "-1.0f"
+ case DOUBLE_TYPE_INFO => "-1.0d"
+ case BOOLEAN_TYPE_INFO => "false"
+ case STRING_TYPE_INFO => "\"\""
+ case CHAR_TYPE_INFO => "'\\0'"
+ case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME => "-1"
+ case SqlTimeTypeInfo.TIMESTAMP => "-1L"
+ case TimeIntervalTypeInfo.INTERVAL_MONTHS => "-1"
+ case TimeIntervalTypeInfo.INTERVAL_MILLIS => "-1L"
+
+ case _ => "null"
+ }
+
+ def superPrimitive(typeInfo: TypeInformation[_]): String = typeInfo match {
+ case _: FractionalTypeInfo[_] => "double"
+ case _ => "long"
+ }
+
+ def qualifyMethod(method: Method): String =
+ method.getDeclaringClass.getCanonicalName + "." + method.getName
+
+ def qualifyEnum(enum: Enum[_]): String =
+ enum.getClass.getCanonicalName + "." + enum.name()
+
+ def internalToTimePointCode(resultType: TypeInformation[_], resultTerm: String) =
+ resultType match {
+ case SqlTimeTypeInfo.DATE =>
+ s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_DATE.method)}($resultTerm)"
+ case SqlTimeTypeInfo.TIME =>
+ s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIME.method)}($resultTerm)"
+ case SqlTimeTypeInfo.TIMESTAMP =>
+ s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIMESTAMP.method)}($resultTerm)"
+ }
+
+ def timePointToInternalCode(resultType: TypeInformation[_], resultTerm: String) =
+ resultType match {
+ case SqlTimeTypeInfo.DATE =>
+ s"${qualifyMethod(BuiltInMethod.DATE_TO_INT.method)}($resultTerm)"
+ case SqlTimeTypeInfo.TIME =>
+ s"${qualifyMethod(BuiltInMethod.TIME_TO_INT.method)}($resultTerm)"
+ case SqlTimeTypeInfo.TIMESTAMP =>
+ s"${qualifyMethod(BuiltInMethod.TIMESTAMP_TO_LONG.method)}($resultTerm)"
+ }
+
+ def compareEnum(term: String, enum: Enum[_]): Boolean = term == qualifyEnum(enum)
+
+ def getEnum(genExpr: GeneratedExpression): Enum[_] = {
+ val split = genExpr.resultTerm.split('.')
+ val value = split.last
+ val clazz = genExpr.resultType.getTypeClass
+ enumValueOf(clazz, value)
+ }
+
+ def enumValueOf[T <: Enum[T]](cls: Class[_], stringValue: String): Enum[_] =
+ Enum.valueOf(cls.asInstanceOf[Class[T]], stringValue).asInstanceOf[Enum[_]]
+
+ // ----------------------------------------------------------------------------------------------
+
+ def requireNumeric(genExpr: GeneratedExpression) =
+ if (!TypeCheckUtils.isNumeric(genExpr.resultType)) {
+ throw new CodeGenException("Numeric expression type expected, but was " +
+ s"'${genExpr.resultType}'.")
+ }
+
+ def requireComparable(genExpr: GeneratedExpression) =
+ if (!TypeCheckUtils.isComparable(genExpr.resultType)) {
+ throw new CodeGenException(s"Comparable type expected, but was '${genExpr.resultType}'.")
+ }
+
+ def requireString(genExpr: GeneratedExpression) =
+ if (!TypeCheckUtils.isString(genExpr.resultType)) {
+ throw new CodeGenException("String expression type expected.")
+ }
+
+ def requireBoolean(genExpr: GeneratedExpression) =
+ if (!TypeCheckUtils.isBoolean(genExpr.resultType)) {
+ throw new CodeGenException("Boolean expression type expected.")
+ }
+
+ def requireTemporal(genExpr: GeneratedExpression) =
+ if (!TypeCheckUtils.isTemporal(genExpr.resultType)) {
+ throw new CodeGenException("Temporal expression type expected.")
+ }
+
+ def requireTimeInterval(genExpr: GeneratedExpression) =
+ if (!TypeCheckUtils.isTimeInterval(genExpr.resultType)) {
+ throw new CodeGenException("Interval expression type expected.")
+ }
+
+ def requireArray(genExpr: GeneratedExpression) =
+ if (!TypeCheckUtils.isArray(genExpr.resultType)) {
+ throw new CodeGenException("Array expression type expected.")
+ }
+
+ def requireInteger(genExpr: GeneratedExpression) =
+ if (!TypeCheckUtils.isInteger(genExpr.resultType)) {
+ throw new CodeGenException("Integer expression type expected.")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ def isReference(genExpr: GeneratedExpression): Boolean = isReference(genExpr.resultType)
+
+ def isReference(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+ case INT_TYPE_INFO
+ | LONG_TYPE_INFO
+ | SHORT_TYPE_INFO
+ | BYTE_TYPE_INFO
+ | FLOAT_TYPE_INFO
+ | DOUBLE_TYPE_INFO
+ | BOOLEAN_TYPE_INFO
+ | CHAR_TYPE_INFO => false
+ case _ => true
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ sealed abstract class FieldAccessor
+
+ case class ObjectFieldAccessor(field: Field) extends FieldAccessor
+
+ case class ObjectGenericFieldAccessor(name: String) extends FieldAccessor
+
+ case class ObjectPrivateFieldAccessor(field: Field) extends FieldAccessor
+
+ case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
+
+ case class ProductAccessor(i: Int) extends FieldAccessor
+
+ def fieldAccessorFor(compType: CompositeType[_], index: Int): FieldAccessor = {
+ compType match {
+ case ri: RowTypeInfo =>
+ ProductAccessor(index)
+
+ case cc: CaseClassTypeInfo[_] =>
+ ObjectMethodAccessor(cc.getFieldNames()(index))
+
+ case javaTup: TupleTypeInfo[_] =>
+ ObjectGenericFieldAccessor("f" + index)
+
+ case pt: PojoTypeInfo[_] =>
+ val fieldName = pt.getFieldNames()(index)
+ getFieldAccessor(pt.getTypeClass, fieldName)
+
+ case _ => throw new CodeGenException("Unsupported composite type.")
+ }
+ }
+
+ def getFieldAccessor(clazz: Class[_], fieldName: String): FieldAccessor = {
+ val field = TypeExtractor.getDeclaredField(clazz, fieldName)
+ if (field.isAccessible) {
+ ObjectFieldAccessor(field)
+ }
+ else {
+ ObjectPrivateFieldAccessor(field)
+ }
+ }
+
+ def isFieldPrimitive(field: Field): Boolean = field.getType.isPrimitive
+
+ def reflectiveFieldReadAccess(fieldTerm: String, field: Field, objectTerm: String): String =
+ field.getType match {
+ case java.lang.Integer.TYPE => s"$fieldTerm.getInt($objectTerm)"
+ case java.lang.Long.TYPE => s"$fieldTerm.getLong($objectTerm)"
+ case java.lang.Short.TYPE => s"$fieldTerm.getShort($objectTerm)"
+ case java.lang.Byte.TYPE => s"$fieldTerm.getByte($objectTerm)"
+ case java.lang.Float.TYPE => s"$fieldTerm.getFloat($objectTerm)"
+ case java.lang.Double.TYPE => s"$fieldTerm.getDouble($objectTerm)"
+ case java.lang.Boolean.TYPE => s"$fieldTerm.getBoolean($objectTerm)"
+ case java.lang.Character.TYPE => s"$fieldTerm.getChar($objectTerm)"
+ case _ => s"(${field.getType.getCanonicalName}) $fieldTerm.get($objectTerm)"
+ }
+
+ def reflectiveFieldWriteAccess(
+ fieldTerm: String,
+ field: Field,
+ objectTerm: String,
+ valueTerm: String)
+ : String =
+ field.getType match {
+ case java.lang.Integer.TYPE => s"$fieldTerm.setInt($objectTerm, $valueTerm)"
+ case java.lang.Long.TYPE => s"$fieldTerm.setLong($objectTerm, $valueTerm)"
+ case java.lang.Short.TYPE => s"$fieldTerm.setShort($objectTerm, $valueTerm)"
+ case java.lang.Byte.TYPE => s"$fieldTerm.setByte($objectTerm, $valueTerm)"
+ case java.lang.Float.TYPE => s"$fieldTerm.setFloat($objectTerm, $valueTerm)"
+ case java.lang.Double.TYPE => s"$fieldTerm.setDouble($objectTerm, $valueTerm)"
+ case java.lang.Boolean.TYPE => s"$fieldTerm.setBoolean($objectTerm, $valueTerm)"
+ case java.lang.Character.TYPE => s"$fieldTerm.setChar($objectTerm, $valueTerm)"
+ case _ => s"$fieldTerm.set($objectTerm, $valueTerm)"
+ }
+}