You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/01/11 13:52:44 UTC
flink git commit: [FLINK-5418] [table] Estimated row size does not
support nested types
Repository: flink
Updated Branches:
refs/heads/master aa220e487 -> c4536e8a2
[FLINK-5418] [table] Estimated row size does not support nested types
This closes #3073.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4536e8a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4536e8a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4536e8a
Branch: refs/heads/master
Commit: c4536e8a235f3b75b04422c4352930940089bd8a
Parents: aa220e4
Author: twalthr <tw...@apache.org>
Authored: Fri Jan 6 13:59:38 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Jan 11 14:51:54 2017 +0100
----------------------------------------------------------------------
.../flink/table/plan/nodes/FlinkRel.scala | 47 +++++++++++---------
.../api/scala/batch/sql/SetOperatorsTest.scala | 17 +++++++
2 files changed, 42 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c4536e8a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index 9b844be..0681401 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -18,9 +18,7 @@
package org.apache.flink.table.plan.nodes
-import java.util
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.flink.api.common.functions.MapFunction
@@ -105,28 +103,33 @@ trait FlinkRel {
}
-
private[flink] def estimateRowSize(rowType: RelDataType): Double = {
val fieldList = rowType.getFieldList
- fieldList.map(_.getType.getSqlTypeName).zipWithIndex.foldLeft(0) { (s, t) =>
- t._1 match {
- case SqlTypeName.TINYINT => s + 1
- case SqlTypeName.SMALLINT => s + 2
- case SqlTypeName.INTEGER => s + 4
- case SqlTypeName.BIGINT => s + 8
- case SqlTypeName.BOOLEAN => s + 1
- case SqlTypeName.FLOAT => s + 4
- case SqlTypeName.DOUBLE => s + 8
- case SqlTypeName.VARCHAR => s + 12
- case SqlTypeName.CHAR => s + 1
- case SqlTypeName.DECIMAL => s + 12
- case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
- case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
- case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
- case SqlTypeName.ROW => s + estimateRowSize(fieldList.get(t._2).getType()).asInstanceOf[Int]
- case _ => throw TableException(s"Unsupported data type encountered: $t")
- }
+ fieldList.map(_.getType).foldLeft(0.0) { (s, t) =>
+ git s + estimateDataTypeSize(t)
}
}
+
+ private[flink] def estimateDataTypeSize(t: RelDataType): Double = t.getSqlTypeName match {
+ case SqlTypeName.TINYINT => 1
+ case SqlTypeName.SMALLINT => 2
+ case SqlTypeName.INTEGER => 4
+ case SqlTypeName.BIGINT => 8
+ case SqlTypeName.BOOLEAN => 1
+ case SqlTypeName.FLOAT => 4
+ case SqlTypeName.DOUBLE => 8
+ case SqlTypeName.VARCHAR => 12
+ case SqlTypeName.CHAR => 1
+ case SqlTypeName.DECIMAL => 12
+ case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => 8
+ case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => 4
+ case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => 12
+ case SqlTypeName.ROW => estimateRowSize(t)
+ case SqlTypeName.ARRAY =>
+ // 16 is an arbitrary estimate
+ estimateDataTypeSize(t.getComponentType) * 16
+ case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate
+ case _ => throw TableException(s"Unsupported data type encountered: $t")
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c4536e8a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
index 6c07c6e..be98a89 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
@@ -27,6 +27,23 @@ import org.junit.Test
class SetOperatorsTest extends TableTestBase {
@Test
+ def testMinusWithNestedTypes(): Unit = {
+ val util = batchTestUtil()
+ val t = util.addTable[(Long, (Int, String), Array[Boolean])]("MyTable", 'a, 'b, 'c)
+
+ val expected = binaryNode(
+ "DataSetMinus",
+ batchTableNode(0),
+ batchTableNode(0),
+ term("minus", "a", "b", "c")
+ )
+
+ val result = t.minus(t)
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
def testExists(): Unit = {
val util = batchTestUtil()
util.addTable[(Long, Int, String)]("A", 'a_long, 'a_int, 'a_string)