You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/26 13:59:09 UTC

flink git commit: [FLINK-4662] Bump Calcite version up to 1.9

Repository: flink
Updated Branches:
  refs/heads/master 95e9004e3 -> 8fa313c39


[FLINK-4662] Bump Calcite version up to 1.9

This closes #2535.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fa313c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fa313c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fa313c3

Branch: refs/heads/master
Commit: 8fa313c39fbad7bb96327477544d1ec15e8dc0f6
Parents: 95e9004
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Thu Sep 22 21:47:37 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Mon Sep 26 15:58:39 2016 +0200

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml                  |  2 +-
 .../apache/flink/api/table/FlinkPlannerImpl.scala    | 15 ++++++++-------
 .../apache/flink/api/table/FlinkTypeFactory.scala    |  8 ++++----
 .../flink/api/table/codegen/CodeGenerator.scala      |  4 ++--
 .../flink/api/table/expressions/arithmetic.scala     |  5 +++++
 .../table/plan/nodes/dataset/DataSetConvention.scala |  7 ++++++-
 .../api/table/plan/nodes/dataset/DataSetRel.scala    |  4 ++--
 .../plan/nodes/datastream/DataStreamConvention.scala |  7 ++++++-
 8 files changed, 34 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index e83a778..4c91f1c 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -51,7 +51,7 @@ under the License.
 		<dependency>
 			<groupId>org.apache.calcite</groupId>
 			<artifactId>calcite-core</artifactId>
-			<version>1.7.0</version>
+			<version>1.9.0</version>
 			<exclusions>
 				<exclusion>
 					<groupId>org.apache.calcite.avatica</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
index bb08654..97e5cf2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
@@ -98,10 +98,10 @@ class FlinkPlannerImpl(
       assert(validatedSqlNode != null)
       val rexBuilder: RexBuilder = createRexBuilder
       val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+      val config = SqlToRelConverter.configBuilder()
+        .withTrimUnusedFields(false).withConvertTableAccess(false).build()
       val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
-        new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable)
-      sqlToRelConverter.setTrimUnusedFields(false)
-      sqlToRelConverter.enableTableAccessConversion(false)
+        new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config)
       root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
       root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
       root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
@@ -118,7 +118,8 @@ class FlinkPlannerImpl(
     override def expandView(
         rowType: RelDataType,
         queryString: String,
-        schemaPath: util.List[String]): RelRoot = {
+        schemaPath: util.List[String],
+        viewPath: util.List[String]): RelRoot = {
 
       val parser: SqlParser = SqlParser.create(queryString, parserConfig)
       var sqlNode: SqlNode = null
@@ -136,10 +137,10 @@ class FlinkPlannerImpl(
       val validatedSqlNode: SqlNode = validator.validate(sqlNode)
       val rexBuilder: RexBuilder = createRexBuilder
       val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+      val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder
+        .withTrimUnusedFields(false).withConvertTableAccess(false).build
       val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
-        new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable)
-      sqlToRelConverter.setTrimUnusedFields(false)
-      sqlToRelConverter.enableTableAccessConversion(false)
+        new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable, config)
       root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
       root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
       root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))

http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
index 5a116db..581ecde 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
@@ -53,7 +53,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
           createSqlIntervalType(
             new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
 
-        case INTERVAL_DAY_TIME =>
+        case INTERVAL_DAY_SECOND =>
           createSqlIntervalType(
             new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
 
@@ -97,7 +97,7 @@ object FlinkTypeFactory {
       case SqlTimeTypeInfo.TIME => TIME
       case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
       case IntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
-      case IntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_TIME
+      case IntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
 
       case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
         throw TableException("Character type is not supported.")
@@ -121,8 +121,8 @@ object FlinkTypeFactory {
     case DATE => SqlTimeTypeInfo.DATE
     case TIME => SqlTimeTypeInfo.TIME
     case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
-    case INTERVAL_YEAR_MONTH => IntervalTypeInfo.INTERVAL_MONTHS
-    case INTERVAL_DAY_TIME => IntervalTypeInfo.INTERVAL_MILLIS
+    case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => IntervalTypeInfo.INTERVAL_MONTHS
+    case typeName if DAY_INTERVAL_TYPES.contains(typeName) => IntervalTypeInfo.INTERVAL_MILLIS
 
     case NULL =>
       throw TableException("Type NULL is not supported. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index e5a07b1..b54c498 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -638,7 +638,7 @@ class CodeGenerator(
       case TIMESTAMP =>
         generateNonNullLiteral(resultType, value.toString + "L")
 
-      case INTERVAL_YEAR_MONTH =>
+      case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
         val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
         if (decimal.isValidInt) {
           generateNonNullLiteral(resultType, decimal.intValue().toString)
@@ -646,7 +646,7 @@ class CodeGenerator(
           throw new CodeGenException("Decimal can not be converted to interval of months.")
         }
 
-      case INTERVAL_DAY_TIME =>
+      case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
         val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
         if (decimal.isValidLong) {
           generateNonNullLiteral(resultType, decimal.longValue().toString + "L")

http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
index b301f22..4a7978a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.IntervalSqlType
 import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
@@ -67,6 +68,10 @@ case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
       relBuilder.call(SqlStdOperatorTable.CONCAT, castedLeft.toRexNode, right.toRexNode)
     } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
       relBuilder.call(SqlStdOperatorTable.PLUS, left.toRexNode, right.toRexNode)
+    } else if (isTimeInterval(left.resultType) && isTemporal(right.resultType)) {
+      // Calcite has a bug that can't apply INTERVAL + DATETIME (INTERVAL at left)
+      // we manually switch them here
+      relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, right.toRexNode, left.toRexNode)
     } else if (isTemporal(left.resultType) && isTemporal(right.resultType)) {
       relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, left.toRexNode, right.toRexNode)
     } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
index cbacd16..03d9a51 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
@@ -24,6 +24,12 @@ class DataSetConvention extends Convention {
 
   override def toString: String = getName
 
+  override def useAbstractConvertersForConversion(
+    fromTraits: RelTraitSet,
+    toTraits: RelTraitSet): Boolean = false
+
+  override def canConvertConvention(toConvention: Convention): Boolean = false
+
   def getInterface: Class[_] = classOf[DataSetRel]
 
   def getName: String = "DATASET"
@@ -33,7 +39,6 @@ class DataSetConvention extends Convention {
   def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
 
   def register(planner: RelOptPlanner): Unit = { }
-
 }
 
 object DataSetConvention {

http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index 9ce1580..39532f0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -62,8 +62,8 @@ trait DataSetRel extends RelNode with FlinkRel {
         case SqlTypeName.VARCHAR => s + 12
         case SqlTypeName.CHAR => s + 1
         case SqlTypeName.DECIMAL => s + 12
-        case SqlTypeName.INTERVAL_DAY_TIME => s + 8
-        case SqlTypeName.INTERVAL_YEAR_MONTH => s + 4
+        case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
+        case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
         case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
         case _ => throw TableException(s"Unsupported data type encountered: $t")
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
index bcce4c4..3b6a653 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
@@ -24,6 +24,12 @@ class DataStreamConvention extends Convention {
 
   override def toString: String = getName
 
+  override def useAbstractConvertersForConversion(
+    fromTraits: RelTraitSet,
+    toTraits: RelTraitSet): Boolean = false
+
+  override def canConvertConvention(toConvention: Convention): Boolean = false
+
   def getInterface: Class[_] = classOf[DataStreamRel]
 
   def getName: String = "DATASTREAM"
@@ -33,7 +39,6 @@ class DataStreamConvention extends Convention {
   def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
 
   def register(planner: RelOptPlanner): Unit = { }
-
 }
 
 object DataStreamConvention {