You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:55 UTC

[26/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
new file mode 100644
index 0000000..9c6eea8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import java.util
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.plan.RelOptTable.ViewExpander
+import org.apache.calcite.plan._
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.RelRoot
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.RexBuilder
+import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.sql.parser.{SqlParser, SqlParseException => CSqlParseException}
+import org.apache.calcite.sql.validate.SqlValidator
+import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter}
+import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
+import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException}
+
+import scala.collection.JavaConversions._
+
+/**
+  * NOTE: this is heavily inspired by Calcite's PlannerImpl.
+  * We need it in order to share the planner between the Table API relational plans
+  * and the SQL relation plans that are created by the Calcite parser.
+  * The main difference is that we do not create a new RelOptPlanner in the ready() method.
+  */
+class FlinkPlannerImpl(
+    config: FrameworkConfig,
+    planner: RelOptPlanner,
+    typeFactory: FlinkTypeFactory) {
+
+  val operatorTable: SqlOperatorTable = config.getOperatorTable
+  /** Holds the trait definitions to be registered with planner. May be null. */
+  val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs
+  val parserConfig: SqlParser.Config = config.getParserConfig
+  val convertletTable: SqlRexConvertletTable = config.getConvertletTable
+  val defaultSchema: SchemaPlus = config.getDefaultSchema
+
+  var validator: FlinkCalciteSqlValidator = _
+  var validatedSqlNode: SqlNode = _
+  var root: RelRoot = _
+
+  private def ready() {
+    if (this.traitDefs != null) {
+      planner.clearRelTraitDefs()
+      for (traitDef <- this.traitDefs) {
+        planner.addRelTraitDef(traitDef)
+      }
+    }
+  }
+
+  def parse(sql: String): SqlNode = {
+    try {
+      ready()
+      val parser: SqlParser = SqlParser.create(sql, parserConfig)
+      val sqlNode: SqlNode = parser.parseStmt
+      sqlNode
+    } catch {
+      case e: CSqlParseException =>
+        throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
+    }
+  }
+
+  def validate(sqlNode: SqlNode): SqlNode = {
+    validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory)
+    validator.setIdentifierExpansion(true)
+    try {
+      validatedSqlNode = validator.validate(sqlNode)
+    }
+    catch {
+      case e: RuntimeException =>
+        throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
+    }
+    validatedSqlNode
+  }
+
+  def rel(sql: SqlNode): RelRoot = {
+    try {
+      assert(validatedSqlNode != null)
+      val rexBuilder: RexBuilder = createRexBuilder
+      val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+      val config = SqlToRelConverter.configBuilder()
+        .withTrimUnusedFields(false).withConvertTableAccess(false).build()
+      val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
+        new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config)
+      root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
+      // we disable automatic flattening in order to let composite types pass without modification
+      // we might enable it again once Calcite has better support for structured types
+      // root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
+      root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+      root
+    } catch {
+      case e: RelConversionException => throw TableException(e.getMessage)
+    }
+  }
+
+  /** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]]
+    * interface for [[org.apache.calcite.tools.Planner]]. */
+  class ViewExpanderImpl extends ViewExpander {
+
+    override def expandView(
+        rowType: RelDataType,
+        queryString: String,
+        schemaPath: util.List[String],
+        viewPath: util.List[String]): RelRoot = {
+
+      val parser: SqlParser = SqlParser.create(queryString, parserConfig)
+      var sqlNode: SqlNode = null
+      try {
+        sqlNode = parser.parseQuery
+      }
+      catch {
+        case e: CSqlParseException =>
+          throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
+      }
+      val catalogReader: CalciteCatalogReader = createCatalogReader.withSchemaPath(schemaPath)
+      val validator: SqlValidator =
+        new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory)
+      validator.setIdentifierExpansion(true)
+      val validatedSqlNode: SqlNode = validator.validate(sqlNode)
+      val rexBuilder: RexBuilder = createRexBuilder
+      val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+      val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder
+        .withTrimUnusedFields(false).withConvertTableAccess(false).build
+      val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
+        new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable, config)
+      root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
+      root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
+      root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+      FlinkPlannerImpl.this.root
+    }
+  }
+
+  private def createCatalogReader: CalciteCatalogReader = {
+    val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema)
+    new CalciteCatalogReader(
+      CalciteSchema.from(rootSchema),
+      parserConfig.caseSensitive,
+      CalciteSchema.from(defaultSchema).path(null),
+      typeFactory)
+  }
+
+  private def createRexBuilder: RexBuilder = {
+    new RexBuilder(typeFactory)
+  }
+
+}
+
+object FlinkPlannerImpl {
+  private def rootSchema(schema: SchemaPlus): SchemaPlus = {
+    if (schema.getParentSchema == null) {
+      schema
+    }
+    else {
+      rootSchema(schema.getParentSchema)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
new file mode 100644
index 0000000..8465ec6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import java.util.Collections
+
+import org.apache.calcite.plan.volcano.VolcanoPlanner
+import java.lang.Iterable
+
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.plan._
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.calcite.rex.RexBuilder
+import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
+import org.apache.calcite.tools.{FrameworkConfig, RelBuilder}
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions.WindowProperty
+import org.apache.flink.table.plan.logical.LogicalWindow
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+/**
+  * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
+  */
+class FlinkRelBuilder(
+    context: Context,
+    relOptCluster: RelOptCluster,
+    relOptSchema: RelOptSchema)
+  extends RelBuilder(
+    context,
+    relOptCluster,
+    relOptSchema) {
+
+  def getPlanner: RelOptPlanner = cluster.getPlanner
+
+  def getCluster: RelOptCluster = relOptCluster
+
+  override def getTypeFactory: FlinkTypeFactory =
+    super.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+  def aggregate(
+      window: LogicalWindow,
+      groupKey: GroupKey,
+      namedProperties: Seq[NamedWindowProperty],
+      aggCalls: Iterable[AggCall])
+    : RelBuilder = {
+    // build logical aggregate
+    val aggregate = super.aggregate(groupKey, aggCalls).build().asInstanceOf[LogicalAggregate]
+
+    // build logical window aggregate from it
+    push(LogicalWindowAggregate.create(window, namedProperties, aggregate))
+    this
+  }
+
+}
+
+object FlinkRelBuilder {
+
+  def create(config: FrameworkConfig): FlinkRelBuilder = {
+
+    // create Flink type factory
+    val typeSystem = config.getTypeSystem
+    val typeFactory = new FlinkTypeFactory(typeSystem)
+
+    // create context instances with Flink type factory
+    val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty())
+    planner.setExecutor(config.getExecutor)
+    planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
+    val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
+    val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
+    val relOptSchema = new CalciteCatalogReader(
+      calciteSchema,
+      config.getParserConfig.caseSensitive(),
+      Collections.emptyList(),
+      typeFactory)
+
+    new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
+  }
+
+  /**
+    * Information necessary to create a window aggregate.
+    *
+    * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]].
+    */
+  case class NamedWindowProperty(name: String, property: WindowProperty)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
new file mode 100644
index 0000000..f3e2f91
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.sql.SqlIntervalQualifier
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.ValueTypeInfo._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
+import org.apache.flink.table.plan.schema.ArrayRelDataType
+import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
+
+import scala.collection.mutable
+
+/**
+  * Flink specific type factory that represents the interface between Flink's [[TypeInformation]]
+  * and Calcite's [[RelDataType]].
+  */
+class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) {
+
+  // NOTE: for future data types it might be necessary to
+  // override more methods of RelDataTypeFactoryImpl
+
+  private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
+
+  def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
+    // simple type can be converted to SQL types and vice versa
+    if (isSimple(typeInfo)) {
+      val sqlType = typeInfoToSqlTypeName(typeInfo)
+      sqlType match {
+
+        case INTERVAL_YEAR_MONTH =>
+          createSqlIntervalType(
+            new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
+
+        case INTERVAL_DAY_SECOND =>
+          createSqlIntervalType(
+            new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
+
+        case _ =>
+          createSqlType(sqlType)
+      }
+    }
+    // advanced types require specific RelDataType
+    // for storing the original TypeInformation
+    else {
+      seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo)))
+    }
+  }
+
+  /**
+    * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
+    *
+    * @param fieldNames field names
+    * @param fieldTypes field types, every element is Flink's [[TypeInformation]]
+    * @return a struct type with the input fieldNames and input fieldTypes
+    */
+  def buildRowDataType(
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]])
+    : RelDataType = {
+    val rowDataTypeBuilder = builder
+    fieldNames
+      .zip(fieldTypes)
+      .foreach { f =>
+        rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true)
+      }
+    rowDataTypeBuilder.build
+  }
+
+  override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
+    // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
+    // always set those to default value
+    if (typeName == VARCHAR && precision < 0) {
+      createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
+    } else {
+      super.createSqlType(typeName, precision)
+    }
+  }
+
+  override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType =
+    new ArrayRelDataType(
+      ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
+      elementType,
+      true)
+
+  private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
+    case ct: CompositeType[_] =>
+      new CompositeRelDataType(ct, this)
+
+    case pa: PrimitiveArrayTypeInfo[_] =>
+      new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)
+
+    case oa: ObjectArrayTypeInfo[_, _] =>
+      new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
+
+    case ti: TypeInformation[_] =>
+      new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
+
+    case ti@_ =>
+      throw TableException(s"Unsupported type information: $ti")
+  }
+
+  override def createTypeWithNullability(
+    relDataType: RelDataType,
+    nullable: Boolean)
+  : RelDataType = relDataType match {
+    case composite: CompositeRelDataType =>
+      // at the moment we do not care about nullability
+      composite
+    case _ =>
+      super.createTypeWithNullability(relDataType, nullable)
+  }
+}
+
+object FlinkTypeFactory {
+
+  private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match {
+      case BOOLEAN_TYPE_INFO => BOOLEAN
+      case BYTE_TYPE_INFO => TINYINT
+      case SHORT_TYPE_INFO => SMALLINT
+      case INT_TYPE_INFO => INTEGER
+      case LONG_TYPE_INFO => BIGINT
+      case FLOAT_TYPE_INFO => FLOAT
+      case DOUBLE_TYPE_INFO => DOUBLE
+      case STRING_TYPE_INFO => VARCHAR
+      case BIG_DEC_TYPE_INFO => DECIMAL
+
+      // temporal types
+      case SqlTimeTypeInfo.DATE => DATE
+      case SqlTimeTypeInfo.TIME => TIME
+      case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
+      case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
+      case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
+
+      case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
+        throw TableException("Character type is not supported.")
+
+      case _@t =>
+        throw TableException(s"Type is not supported: $t")
+  }
+
+  def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match {
+    case BOOLEAN => BOOLEAN_TYPE_INFO
+    case TINYINT => BYTE_TYPE_INFO
+    case SMALLINT => SHORT_TYPE_INFO
+    case INTEGER => INT_TYPE_INFO
+    case BIGINT => LONG_TYPE_INFO
+    case FLOAT => FLOAT_TYPE_INFO
+    case DOUBLE => DOUBLE_TYPE_INFO
+    case VARCHAR | CHAR => STRING_TYPE_INFO
+    case DECIMAL => BIG_DEC_TYPE_INFO
+
+    // temporal types
+    case DATE => SqlTimeTypeInfo.DATE
+    case TIME => SqlTimeTypeInfo.TIME
+    case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
+    case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MONTHS
+    case typeName if DAY_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+    case NULL =>
+      throw TableException("Type NULL is not supported. Null values must have a supported type.")
+
+    // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
+    // are represented as integer
+    case SYMBOL => INT_TYPE_INFO
+
+    // extract encapsulated TypeInformation
+    case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
+      val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
+      genericRelDataType.typeInfo
+
+    case ROW if relDataType.isInstanceOf[CompositeRelDataType] =>
+      val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType]
+      compositeRelDataType.compositeType
+
+    // ROW and CURSOR for UDTF case, whose type info will never be used, just a placeholder
+    case ROW | CURSOR => new NothingTypeInfo
+
+    case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
+      val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
+      arrayRelDataType.typeInfo
+
+    case _@t =>
+      throw TableException(s"Type is not supported: $t")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala
new file mode 100644
index 0000000..5935297
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl
+import org.apache.calcite.sql.`type`.SqlTypeName
+
+/**
+  * Custom type system for Flink.
+  */
+class FlinkTypeSystem extends RelDataTypeSystemImpl {
+
+  // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic
+  // half should be enough for all use cases
+  override def getMaxNumericScale: Int = Int.MaxValue / 2
+
+  // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic
+  // half should be enough for all use cases
+  override def getMaxNumericPrecision: Int = Int.MaxValue / 2
+
+  override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName match {
+
+    // by default all VARCHARs can have the Java default length
+    case SqlTypeName.VARCHAR =>
+      Int.MaxValue
+
+    // we currenty support only timestamps with milliseconds precision
+    case SqlTypeName.TIMESTAMP =>
+      3
+
+    case _ =>
+      super.getDefaultPrecision(typeName)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala
new file mode 100644
index 0000000..1f2e9a9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+/**
+  * Exception for all errors occurring during code generation.
+  */
+class CodeGenException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
new file mode 100644
index 0000000..f8885a2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import java.lang.reflect.{Field, Method}
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
+import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils}
+
+object CodeGenUtils {
+
+  private val nameCounter = new AtomicInteger
+
+  def newName(name: String): String = {
+    s"$name$$${nameCounter.getAndIncrement}"
+  }
+
+  // when casting we first need to unbox Primitives, for example,
+  // float a = 1.0f;
+  // byte b = (byte) a;
+  // works, but for boxed types we need this:
+  // Float a = 1.0f;
+  // Byte b = (byte)(float) a;
+  def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
+    case INT_TYPE_INFO => "int"
+    case LONG_TYPE_INFO => "long"
+    case SHORT_TYPE_INFO => "short"
+    case BYTE_TYPE_INFO => "byte"
+    case FLOAT_TYPE_INFO => "float"
+    case DOUBLE_TYPE_INFO => "double"
+    case BOOLEAN_TYPE_INFO => "boolean"
+    case CHAR_TYPE_INFO => "char"
+
+    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
+    // does not seem to like this, so we manually give the correct type here.
+    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+    // internal primitive representation of time points
+    case SqlTimeTypeInfo.DATE => "int"
+    case SqlTimeTypeInfo.TIME => "int"
+    case SqlTimeTypeInfo.TIMESTAMP => "long"
+
+    // internal primitive representation of time intervals
+    case TimeIntervalTypeInfo.INTERVAL_MONTHS => "int"
+    case TimeIntervalTypeInfo.INTERVAL_MILLIS => "long"
+
+    case _ =>
+      tpe.getTypeClass.getCanonicalName
+  }
+
+  def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
+    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
+    // does not seem to like this, so we manually give the correct type here.
+    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+    case _ =>
+      tpe.getTypeClass.getCanonicalName
+  }
+
+  def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match {
+    case INT_TYPE_INFO => "-1"
+    case LONG_TYPE_INFO => "-1L"
+    case SHORT_TYPE_INFO => "-1"
+    case BYTE_TYPE_INFO => "-1"
+    case FLOAT_TYPE_INFO => "-1.0f"
+    case DOUBLE_TYPE_INFO => "-1.0d"
+    case BOOLEAN_TYPE_INFO => "false"
+    case STRING_TYPE_INFO => "\"\""
+    case CHAR_TYPE_INFO => "'\\0'"
+    case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME => "-1"
+    case SqlTimeTypeInfo.TIMESTAMP => "-1L"
+    case TimeIntervalTypeInfo.INTERVAL_MONTHS => "-1"
+    case TimeIntervalTypeInfo.INTERVAL_MILLIS => "-1L"
+
+    case _ => "null"
+  }
+
+  def superPrimitive(typeInfo: TypeInformation[_]): String = typeInfo match {
+    case _: FractionalTypeInfo[_] => "double"
+    case _ => "long"
+  }
+
+  def qualifyMethod(method: Method): String =
+    method.getDeclaringClass.getCanonicalName + "." + method.getName
+
+  def qualifyEnum(enum: Enum[_]): String =
+    enum.getClass.getCanonicalName + "." + enum.name()
+
+  def internalToTimePointCode(resultType: TypeInformation[_], resultTerm: String) =
+    resultType match {
+      case SqlTimeTypeInfo.DATE =>
+        s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_DATE.method)}($resultTerm)"
+      case SqlTimeTypeInfo.TIME =>
+        s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIME.method)}($resultTerm)"
+      case SqlTimeTypeInfo.TIMESTAMP =>
+        s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIMESTAMP.method)}($resultTerm)"
+    }
+
+  def timePointToInternalCode(resultType: TypeInformation[_], resultTerm: String) =
+    resultType match {
+      case SqlTimeTypeInfo.DATE =>
+        s"${qualifyMethod(BuiltInMethod.DATE_TO_INT.method)}($resultTerm)"
+      case SqlTimeTypeInfo.TIME =>
+        s"${qualifyMethod(BuiltInMethod.TIME_TO_INT.method)}($resultTerm)"
+      case SqlTimeTypeInfo.TIMESTAMP =>
+        s"${qualifyMethod(BuiltInMethod.TIMESTAMP_TO_LONG.method)}($resultTerm)"
+    }
+
+  def compareEnum(term: String, enum: Enum[_]): Boolean = term == qualifyEnum(enum)
+
+  def getEnum(genExpr: GeneratedExpression): Enum[_] = {
+    val split = genExpr.resultTerm.split('.')
+    val value = split.last
+    val clazz = genExpr.resultType.getTypeClass
+    enumValueOf(clazz, value)
+  }
+
+  def enumValueOf[T <: Enum[T]](cls: Class[_], stringValue: String): Enum[_] =
+    Enum.valueOf(cls.asInstanceOf[Class[T]], stringValue).asInstanceOf[Enum[_]]
+
+  // ----------------------------------------------------------------------------------------------
+
+  def requireNumeric(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isNumeric(genExpr.resultType)) {
+      throw new CodeGenException("Numeric expression type expected, but was " +
+        s"'${genExpr.resultType}'.")
+    }
+
+  def requireComparable(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isComparable(genExpr.resultType)) {
+      throw new CodeGenException(s"Comparable type expected, but was '${genExpr.resultType}'.")
+    }
+
+  def requireString(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isString(genExpr.resultType)) {
+      throw new CodeGenException("String expression type expected.")
+    }
+
+  def requireBoolean(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isBoolean(genExpr.resultType)) {
+      throw new CodeGenException("Boolean expression type expected.")
+    }
+
+  def requireTemporal(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isTemporal(genExpr.resultType)) {
+      throw new CodeGenException("Temporal expression type expected.")
+    }
+
+  def requireTimeInterval(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isTimeInterval(genExpr.resultType)) {
+      throw new CodeGenException("Interval expression type expected.")
+    }
+
+  def requireArray(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isArray(genExpr.resultType)) {
+      throw new CodeGenException("Array expression type expected.")
+    }
+
+  def requireInteger(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isInteger(genExpr.resultType)) {
+      throw new CodeGenException("Integer expression type expected.")
+    }
+
+  // ----------------------------------------------------------------------------------------------
+
+  def isReference(genExpr: GeneratedExpression): Boolean = isReference(genExpr.resultType)
+
+  def isReference(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+    case INT_TYPE_INFO
+         | LONG_TYPE_INFO
+         | SHORT_TYPE_INFO
+         | BYTE_TYPE_INFO
+         | FLOAT_TYPE_INFO
+         | DOUBLE_TYPE_INFO
+         | BOOLEAN_TYPE_INFO
+         | CHAR_TYPE_INFO => false
+    case _ => true
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  sealed abstract class FieldAccessor
+
+  case class ObjectFieldAccessor(field: Field) extends FieldAccessor
+
+  case class ObjectGenericFieldAccessor(name: String) extends FieldAccessor
+
+  case class ObjectPrivateFieldAccessor(field: Field) extends FieldAccessor
+
+  case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
+
+  case class ProductAccessor(i: Int) extends FieldAccessor
+
+  def fieldAccessorFor(compType: CompositeType[_], index: Int): FieldAccessor = {
+    compType match {
+      case ri: RowTypeInfo =>
+        ProductAccessor(index)
+
+      case cc: CaseClassTypeInfo[_] =>
+        ObjectMethodAccessor(cc.getFieldNames()(index))
+
+      case javaTup: TupleTypeInfo[_] =>
+        ObjectGenericFieldAccessor("f" + index)
+
+      case pt: PojoTypeInfo[_] =>
+        val fieldName = pt.getFieldNames()(index)
+        getFieldAccessor(pt.getTypeClass, fieldName)
+
+      case _ => throw new CodeGenException("Unsupported composite type.")
+    }
+  }
+
+  def getFieldAccessor(clazz: Class[_], fieldName: String): FieldAccessor = {
+    val field = TypeExtractor.getDeclaredField(clazz, fieldName)
+    if (field.isAccessible) {
+      ObjectFieldAccessor(field)
+    }
+    else {
+      ObjectPrivateFieldAccessor(field)
+    }
+  }
+
+  def isFieldPrimitive(field: Field): Boolean = field.getType.isPrimitive
+
+  def reflectiveFieldReadAccess(fieldTerm: String, field: Field, objectTerm: String): String =
+    field.getType match {
+      case java.lang.Integer.TYPE => s"$fieldTerm.getInt($objectTerm)"
+      case java.lang.Long.TYPE => s"$fieldTerm.getLong($objectTerm)"
+      case java.lang.Short.TYPE => s"$fieldTerm.getShort($objectTerm)"
+      case java.lang.Byte.TYPE => s"$fieldTerm.getByte($objectTerm)"
+      case java.lang.Float.TYPE => s"$fieldTerm.getFloat($objectTerm)"
+      case java.lang.Double.TYPE => s"$fieldTerm.getDouble($objectTerm)"
+      case java.lang.Boolean.TYPE => s"$fieldTerm.getBoolean($objectTerm)"
+      case java.lang.Character.TYPE => s"$fieldTerm.getChar($objectTerm)"
+      case _ => s"(${field.getType.getCanonicalName}) $fieldTerm.get($objectTerm)"
+    }
+
+  def reflectiveFieldWriteAccess(
+      fieldTerm: String,
+      field: Field,
+      objectTerm: String,
+      valueTerm: String)
+    : String =
+    field.getType match {
+      case java.lang.Integer.TYPE => s"$fieldTerm.setInt($objectTerm, $valueTerm)"
+      case java.lang.Long.TYPE => s"$fieldTerm.setLong($objectTerm, $valueTerm)"
+      case java.lang.Short.TYPE => s"$fieldTerm.setShort($objectTerm, $valueTerm)"
+      case java.lang.Byte.TYPE => s"$fieldTerm.setByte($objectTerm, $valueTerm)"
+      case java.lang.Float.TYPE => s"$fieldTerm.setFloat($objectTerm, $valueTerm)"
+      case java.lang.Double.TYPE => s"$fieldTerm.setDouble($objectTerm, $valueTerm)"
+      case java.lang.Boolean.TYPE => s"$fieldTerm.setBoolean($objectTerm, $valueTerm)"
+      case java.lang.Character.TYPE => s"$fieldTerm.setChar($objectTerm, $valueTerm)"
+      case _ => s"$fieldTerm.set($objectTerm, $valueTerm)"
+    }
+}