You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/26 13:59:09 UTC
flink git commit: [FLINK-4662] Bump Calcite version up to 1.9
Repository: flink
Updated Branches:
refs/heads/master 95e9004e3 -> 8fa313c39
[FLINK-4662] Bump Calcite version up to 1.9
This closes #2535.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fa313c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fa313c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fa313c3
Branch: refs/heads/master
Commit: 8fa313c39fbad7bb96327477544d1ec15e8dc0f6
Parents: 95e9004
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Thu Sep 22 21:47:37 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Mon Sep 26 15:58:39 2016 +0200
----------------------------------------------------------------------
flink-libraries/flink-table/pom.xml | 2 +-
.../apache/flink/api/table/FlinkPlannerImpl.scala | 15 ++++++++-------
.../apache/flink/api/table/FlinkTypeFactory.scala | 8 ++++----
.../flink/api/table/codegen/CodeGenerator.scala | 4 ++--
.../flink/api/table/expressions/arithmetic.scala | 5 +++++
.../table/plan/nodes/dataset/DataSetConvention.scala | 7 ++++++-
.../api/table/plan/nodes/dataset/DataSetRel.scala | 4 ++--
.../plan/nodes/datastream/DataStreamConvention.scala | 7 ++++++-
8 files changed, 34 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index e83a778..4c91f1c 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -51,7 +51,7 @@ under the License.
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
- <version>1.7.0</version>
+ <version>1.9.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
index bb08654..97e5cf2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
@@ -98,10 +98,10 @@ class FlinkPlannerImpl(
assert(validatedSqlNode != null)
val rexBuilder: RexBuilder = createRexBuilder
val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+ val config = SqlToRelConverter.configBuilder()
+ .withTrimUnusedFields(false).withConvertTableAccess(false).build()
val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
- new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable)
- sqlToRelConverter.setTrimUnusedFields(false)
- sqlToRelConverter.enableTableAccessConversion(false)
+ new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config)
root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
@@ -118,7 +118,8 @@ class FlinkPlannerImpl(
override def expandView(
rowType: RelDataType,
queryString: String,
- schemaPath: util.List[String]): RelRoot = {
+ schemaPath: util.List[String],
+ viewPath: util.List[String]): RelRoot = {
val parser: SqlParser = SqlParser.create(queryString, parserConfig)
var sqlNode: SqlNode = null
@@ -136,10 +137,10 @@ class FlinkPlannerImpl(
val validatedSqlNode: SqlNode = validator.validate(sqlNode)
val rexBuilder: RexBuilder = createRexBuilder
val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+ val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder
+ .withTrimUnusedFields(false).withConvertTableAccess(false).build
val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
- new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable)
- sqlToRelConverter.setTrimUnusedFields(false)
- sqlToRelConverter.enableTableAccessConversion(false)
+ new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable, config)
root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
index 5a116db..581ecde 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
@@ -53,7 +53,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
createSqlIntervalType(
new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
- case INTERVAL_DAY_TIME =>
+ case INTERVAL_DAY_SECOND =>
createSqlIntervalType(
new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
@@ -97,7 +97,7 @@ object FlinkTypeFactory {
case SqlTimeTypeInfo.TIME => TIME
case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
case IntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
- case IntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_TIME
+ case IntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
throw TableException("Character type is not supported.")
@@ -121,8 +121,8 @@ object FlinkTypeFactory {
case DATE => SqlTimeTypeInfo.DATE
case TIME => SqlTimeTypeInfo.TIME
case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
- case INTERVAL_YEAR_MONTH => IntervalTypeInfo.INTERVAL_MONTHS
- case INTERVAL_DAY_TIME => IntervalTypeInfo.INTERVAL_MILLIS
+ case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => IntervalTypeInfo.INTERVAL_MONTHS
+ case typeName if DAY_INTERVAL_TYPES.contains(typeName) => IntervalTypeInfo.INTERVAL_MILLIS
case NULL =>
throw TableException("Type NULL is not supported. " +
http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index e5a07b1..b54c498 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -638,7 +638,7 @@ class CodeGenerator(
case TIMESTAMP =>
generateNonNullLiteral(resultType, value.toString + "L")
- case INTERVAL_YEAR_MONTH =>
+ case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
if (decimal.isValidInt) {
generateNonNullLiteral(resultType, decimal.intValue().toString)
@@ -646,7 +646,7 @@ class CodeGenerator(
throw new CodeGenException("Decimal can not be converted to interval of months.")
}
- case INTERVAL_DAY_TIME =>
+ case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
if (decimal.isValidLong) {
generateNonNullLiteral(resultType, decimal.longValue().toString + "L")
http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
index b301f22..4a7978a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.table.expressions
import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.IntervalSqlType
import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.tools.RelBuilder
@@ -67,6 +68,10 @@ case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
relBuilder.call(SqlStdOperatorTable.CONCAT, castedLeft.toRexNode, right.toRexNode)
} else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
relBuilder.call(SqlStdOperatorTable.PLUS, left.toRexNode, right.toRexNode)
+ } else if (isTimeInterval(left.resultType) && isTemporal(right.resultType)) {
+ // Calcite has a bug that can't apply INTERVAL + DATETIME (INTERVAL at left)
+ // we manually switch them here
+ relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, right.toRexNode, left.toRexNode)
} else if (isTemporal(left.resultType) && isTemporal(right.resultType)) {
relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, left.toRexNode, right.toRexNode)
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
index cbacd16..03d9a51 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
@@ -24,6 +24,12 @@ class DataSetConvention extends Convention {
override def toString: String = getName
+ override def useAbstractConvertersForConversion(
+ fromTraits: RelTraitSet,
+ toTraits: RelTraitSet): Boolean = false
+
+ override def canConvertConvention(toConvention: Convention): Boolean = false
+
def getInterface: Class[_] = classOf[DataSetRel]
def getName: String = "DATASET"
@@ -33,7 +39,6 @@ class DataSetConvention extends Convention {
def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
def register(planner: RelOptPlanner): Unit = { }
-
}
object DataSetConvention {
http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index 9ce1580..39532f0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -62,8 +62,8 @@ trait DataSetRel extends RelNode with FlinkRel {
case SqlTypeName.VARCHAR => s + 12
case SqlTypeName.CHAR => s + 1
case SqlTypeName.DECIMAL => s + 12
- case SqlTypeName.INTERVAL_DAY_TIME => s + 8
- case SqlTypeName.INTERVAL_YEAR_MONTH => s + 4
+ case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
+ case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
case _ => throw TableException(s"Unsupported data type encountered: $t")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
index bcce4c4..3b6a653 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala
@@ -24,6 +24,12 @@ class DataStreamConvention extends Convention {
override def toString: String = getName
+ override def useAbstractConvertersForConversion(
+ fromTraits: RelTraitSet,
+ toTraits: RelTraitSet): Boolean = false
+
+ override def canConvertConvention(toConvention: Convention): Boolean = false
+
def getInterface: Class[_] = classOf[DataStreamRel]
def getName: String = "DATASTREAM"
@@ -33,7 +39,6 @@ class DataStreamConvention extends Convention {
def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
def register(planner: RelOptPlanner): Unit = { }
-
}
object DataStreamConvention {