You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/01/11 13:52:44 UTC

flink git commit: [FLINK-5418] [table] Estimated row size does not support nested types

Repository: flink
Updated Branches:
  refs/heads/master aa220e487 -> c4536e8a2


[FLINK-5418] [table] Estimated row size does not support nested types

This closes #3073.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4536e8a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4536e8a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4536e8a

Branch: refs/heads/master
Commit: c4536e8a235f3b75b04422c4352930940089bd8a
Parents: aa220e4
Author: twalthr <tw...@apache.org>
Authored: Fri Jan 6 13:59:38 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Jan 11 14:51:54 2017 +0100

----------------------------------------------------------------------
 .../flink/table/plan/nodes/FlinkRel.scala       | 47 +++++++++++---------
 .../api/scala/batch/sql/SetOperatorsTest.scala  | 17 +++++++
 2 files changed, 42 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4536e8a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index 9b844be..0681401 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -18,9 +18,7 @@
 
 package org.apache.flink.table.plan.nodes
 
-import java.util
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.functions.MapFunction
@@ -105,28 +103,33 @@ trait FlinkRel {
 
   }
 
-
   private[flink] def estimateRowSize(rowType: RelDataType): Double = {
     val fieldList = rowType.getFieldList
 
-    fieldList.map(_.getType.getSqlTypeName).zipWithIndex.foldLeft(0) { (s, t) =>
-      t._1 match {
-        case SqlTypeName.TINYINT => s + 1
-        case SqlTypeName.SMALLINT => s + 2
-        case SqlTypeName.INTEGER => s + 4
-        case SqlTypeName.BIGINT => s + 8
-        case SqlTypeName.BOOLEAN => s + 1
-        case SqlTypeName.FLOAT => s + 4
-        case SqlTypeName.DOUBLE => s + 8
-        case SqlTypeName.VARCHAR => s + 12
-        case SqlTypeName.CHAR => s + 1
-        case SqlTypeName.DECIMAL => s + 12
-        case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
-        case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
-        case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
-        case SqlTypeName.ROW => s + estimateRowSize(fieldList.get(t._2).getType()).asInstanceOf[Int]
-        case _ => throw TableException(s"Unsupported data type encountered: $t")
-      }
+    fieldList.map(_.getType).foldLeft(0.0) { (s, t) =>
+      git s + estimateDataTypeSize(t)
     }
   }
+
+  private[flink] def estimateDataTypeSize(t: RelDataType): Double = t.getSqlTypeName match {
+    case SqlTypeName.TINYINT => 1
+    case SqlTypeName.SMALLINT => 2
+    case SqlTypeName.INTEGER => 4
+    case SqlTypeName.BIGINT => 8
+    case SqlTypeName.BOOLEAN => 1
+    case SqlTypeName.FLOAT => 4
+    case SqlTypeName.DOUBLE => 8
+    case SqlTypeName.VARCHAR => 12
+    case SqlTypeName.CHAR => 1
+    case SqlTypeName.DECIMAL => 12
+    case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => 8
+    case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => 4
+    case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => 12
+    case SqlTypeName.ROW => estimateRowSize(t)
+    case SqlTypeName.ARRAY =>
+      // 16 is an arbitrary estimate
+      estimateDataTypeSize(t.getComponentType) * 16
+    case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate
+    case _ => throw TableException(s"Unsupported data type encountered: $t")
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c4536e8a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
index 6c07c6e..be98a89 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
@@ -27,6 +27,23 @@ import org.junit.Test
 class SetOperatorsTest extends TableTestBase {
 
   @Test
+  def testMinusWithNestedTypes(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Long, (Int, String), Array[Boolean])]("MyTable", 'a, 'b, 'c)
+
+    val expected = binaryNode(
+      "DataSetMinus",
+      batchTableNode(0),
+      batchTableNode(0),
+      term("minus", "a", "b", "c")
+    )
+
+    val result = t.minus(t)
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
   def testExists(): Unit = {
     val util = batchTestUtil()
     util.addTable[(Long, Int, String)]("A", 'a_long, 'a_int, 'a_string)