You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/05/22 15:06:43 UTC

flink git commit: [FLINK-8838] [table] Add support for UNNEST on MultiSet fields

Repository: flink
Updated Branches:
  refs/heads/master c80e76bd7 -> a273f645b


[FLINK-8838] [table] Add support for UNNEST on MultiSet fields

This closes #5619.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a273f645
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a273f645
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a273f645

Branch: refs/heads/master
Commit: a273f645b4120b143ac122e4db91616cd32bec5b
Parents: c80e76b
Author: lincoln-lil <li...@gmail.com>
Authored: Fri Mar 2 20:05:44 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Tue May 22 16:59:09 2018 +0200

----------------------------------------------------------------------
 .../plan/rules/logical/LogicalUnnestRule.scala  |  27 ++--
 .../table/plan/util/ExplodeFunctionUtil.scala   | 132 ++++++++++++++-----
 .../runtime/batch/sql/AggregateITCase.scala     |  31 ++++-
 .../table/runtime/stream/sql/SqlITCase.scala    |  74 ++++++++++-
 4 files changed, 218 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a273f645/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
index 23dfc03..8ef9fd3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
@@ -32,7 +32,7 @@ import org.apache.calcite.sql.`type`.AbstractSqlType
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-import org.apache.flink.table.plan.schema.ArrayRelDataType
+import org.apache.flink.table.plan.schema.{ArrayRelDataType, MultisetRelDataType}
 import org.apache.flink.table.plan.util.ExplodeFunctionUtil
 
 class LogicalUnnestRule(
@@ -76,22 +76,27 @@ class LogicalUnnestRule(
         case uc: Uncollect =>
           // convert Uncollect into TableFunctionScan
           val cluster = correlate.getCluster
+          val dataType = uc.getInput.getRowType.getFieldList.get(0).getValue
+          val (componentType, explodeTableFunc) = dataType match {
+            case arrayType: ArrayRelDataType =>
+              (arrayType.getComponentType,
+                ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo))
+            case mt: MultisetRelDataType =>
+              (mt.getComponentType, ExplodeFunctionUtil.explodeTableFuncFromType(mt.typeInfo))
+            case _ => throw TableException(s"Unsupported UNNEST on type: ${dataType.toString}")
+          }
 
-          val arrayType =
-            uc.getInput.getRowType.getFieldList.get(0).getValue.asInstanceOf[ArrayRelDataType]
-          val componentType = arrayType.getComponentType
-
-          // create table function
-          val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunction(
+          // create sql function
+          val explodeSqlFunc = UserDefinedFunctionUtils.createTableSqlFunction(
             "explode",
             "explode",
-            ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo),
-            FlinkTypeFactory.toTypeInfo(arrayType.getComponentType),
+            explodeTableFunc,
+            FlinkTypeFactory.toTypeInfo(componentType),
             cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory])
 
           // create table function call
           val rexCall = cluster.getRexBuilder.makeCall(
-            explodeTableFunc,
+            explodeSqlFunc,
             uc.getInput.asInstanceOf[RelSubset]
               .getOriginal.asInstanceOf[LogicalProject].getChildExps
           )
@@ -104,7 +109,7 @@ class LogicalUnnestRule(
                 ImmutableList.of(new RelDataTypeFieldImpl("f0", 0, componentType)))
             case _: RelRecordType => componentType
             case _ => throw TableException(
-              s"Unsupported array component type in UNNEST: ${componentType.toString}")
+              s"Unsupported component type in UNNEST: ${componentType.toString}")
           }
 
           // create table function scan

http://git-wip-us.apache.org/repos/asf/flink/blob/a273f645/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
index 1bcc6d9..cfcaa84 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
@@ -18,74 +18,146 @@
 
 package org.apache.flink.table.plan.util
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo}
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.functions.TableFunction
 
-class ObjectExplodeTableFunc extends TableFunction[Object] {
+abstract class ExplodeTableFunction[T] extends TableFunction[T] {
+
+  def collectArray(array: Array[T]): Unit = {
+    if (null != array) {
+      var i = 0
+      while (i < array.length) {
+        collect(array(i))
+        i += 1
+      }
+    }
+  }
+
+  def collect(map: util.Map[T, Integer]): Unit = {
+    if (null != map) {
+      val it = map.entrySet().iterator()
+      while (it.hasNext) {
+        val item = it.next()
+        val key: T = item.getKey
+        val cnt: Int = item.getValue
+        var i = 0
+        while (i < cnt) {
+          collect(key)
+          i += 1
+        }
+      }
+    }
+  }
+}
+
+class ObjectExplodeTableFunc extends ExplodeTableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
-    arr.foreach(collect)
+    collectArray(arr)
+  }
+
+  def eval(map: util.Map[Object, Integer]): Unit = {
+    collect(map)
   }
 }
 
-class FloatExplodeTableFunc extends TableFunction[Float] {
+class FloatExplodeTableFunc extends ExplodeTableFunction[Float] {
   def eval(arr: Array[Float]): Unit = {
-    arr.foreach(collect)
+    collectArray(arr)
+  }
+
+  def eval(map: util.Map[Float, Integer]): Unit = {
+    collect(map)
   }
 }
 
-class ShortExplodeTableFunc extends TableFunction[Short] {
+class ShortExplodeTableFunc extends ExplodeTableFunction[Short] {
   def eval(arr: Array[Short]): Unit = {
-    arr.foreach(collect)
+    collectArray(arr)
+  }
+
+  def eval(map: util.Map[Short, Integer]): Unit = {
+    collect(map)
   }
 }
-class IntExplodeTableFunc extends TableFunction[Int] {
+class IntExplodeTableFunc extends ExplodeTableFunction[Int] {
   def eval(arr: Array[Int]): Unit = {
-    arr.foreach(collect)
+    collectArray(arr)
+  }
+
+  def eval(map: util.Map[Int, Integer]): Unit = {
+    collect(map)
   }
 }
 
-class LongExplodeTableFunc extends TableFunction[Long] {
+class LongExplodeTableFunc extends ExplodeTableFunction[Long] {
   def eval(arr: Array[Long]): Unit = {
-    arr.foreach(collect)
+    collectArray(arr)
+  }
+
+  def eval(map: util.Map[Long, Integer]): Unit = {
+    collect(map)
   }
 }
 
-class DoubleExplodeTableFunc extends TableFunction[Double] {
+class DoubleExplodeTableFunc extends ExplodeTableFunction[Double] {
   def eval(arr: Array[Double]): Unit = {
-    arr.foreach(collect)
+    collectArray(arr)
+  }
+
+  def eval(map: util.Map[Double, Integer]): Unit = {
+    collect(map)
   }
 }
 
-class ByteExplodeTableFunc extends TableFunction[Byte] {
+class ByteExplodeTableFunc extends ExplodeTableFunction[Byte] {
   def eval(arr: Array[Byte]): Unit = {
-    arr.foreach(collect)
+    collectArray(arr)
+  }
+
+  def eval(map: util.Map[Byte, Integer]): Unit = {
+    collect(map)
   }
 }
 
-class BooleanExplodeTableFunc extends TableFunction[Boolean] {
+class BooleanExplodeTableFunc extends ExplodeTableFunction[Boolean] {
   def eval(arr: Array[Boolean]): Unit = {
-    arr.foreach(collect)
+    collectArray(arr)
+  }
+
+  def eval(map: util.Map[Boolean, Integer]): Unit = {
+    collect(map)
   }
 }
 
 object ExplodeFunctionUtil {
-  def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = {
+  def explodeTableFuncFromType(ti: TypeInformation[_]): TableFunction[_] = {
     ti match {
-      case pat: PrimitiveArrayTypeInfo[_] => {
-        pat.getComponentType match {
-          case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc
-          case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc
-          case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc
-          case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc
-          case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc
-          case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc
-          case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc
-        }
-      }
+      case pat: PrimitiveArrayTypeInfo[_] => createTableFuncByType(pat.getComponentType)
+
       case _: ObjectArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
+
       case _: BasicArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
-      case _ => throw new UnsupportedOperationException(ti.toString + "IS NOT supported")
+
+      case mt: MultisetTypeInfo[_] => createTableFuncByType(mt.getElementTypeInfo)
+
+      case _ => throw new TableException("Unnesting of '" + ti.toString + "' is not supported.")
+    }
+  }
+
+  def createTableFuncByType(typeInfo: TypeInformation[_]): TableFunction[_] = {
+    typeInfo match {
+      case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc
+      case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc
+      case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc
+      case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc
+      case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc
+      case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc
+      case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc
+      case _ => new ObjectExplodeTableFunc
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a273f645/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
index ac0b705..09ccfc4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
@@ -22,9 +22,9 @@ import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTimesta
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMergeAndReset
 import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.utils.NonMergableCount
@@ -35,7 +35,6 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
 class AggregateITCase(
@@ -368,6 +367,34 @@ class AggregateITCase(
   }
 
   @Test
+  def testTumbleWindowAggregateWithCollectUnnest(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .map(x => (x._1, x._2, x._3, toTimestamp(x._1 * 1000)))
+    tEnv.registerDataSet("t1", ds, 'a, 'b, 'c, 'ts)
+
+    val t2 = tEnv.sqlQuery("SELECT b, COLLECT(b) as `set`" +
+        "FROM t1 " +
+        "GROUP BY b, TUMBLE(ts, INTERVAL '3' SECOND)")
+    tEnv.registerTable("t2", t2)
+
+    val result = tEnv.sqlQuery("SELECT b, s FROM t2, UNNEST(t2.`set`) AS A(s) where b < 3")
+      .toDataSet[Row]
+      .collect()
+
+    val expected = Seq(
+      "1,1",
+      "2,2",
+      "2,2"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  @Test
   def testTumbleWindowWithProperties(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/a273f645/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 9155ff9..e132349 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -480,7 +480,6 @@ class SqlITCase extends StreamingWithStateTestBase {
 
   @Test
   def testUnnestPrimitiveArrayFromTable(): Unit = {
-
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear
@@ -512,7 +511,6 @@ class SqlITCase extends StreamingWithStateTestBase {
 
   @Test
   def testUnnestArrayOfArrayFromTable(): Unit = {
-
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear
@@ -542,7 +540,6 @@ class SqlITCase extends StreamingWithStateTestBase {
 
   @Test
   def testUnnestObjectArrayFromTableWithFilter(): Unit = {
-
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear
@@ -568,6 +565,77 @@ class SqlITCase extends StreamingWithStateTestBase {
   }
 
   @Test
+  def testUnnestMultiSetFromCollectResult(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+
+    val data = List(
+      (1, 1, (12, "45.6")),
+      (2, 2, (12, "45.612")),
+      (3, 2, (13, "41.6")),
+      (4, 3, (14, "45.2136")),
+      (5, 3, (18, "42.6")))
+    tEnv.registerTable("t1", env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c))
+
+    val t2 = tEnv.sqlQuery("SELECT b, COLLECT(c) as `set` FROM t1 GROUP BY b")
+    tEnv.registerTable("t2", t2)
+
+    val result = tEnv
+      .sqlQuery("SELECT b, id, point FROM t2, UNNEST(t2.`set`) AS A(id, point) WHERE b < 3")
+      .toRetractStream[Row]
+    result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+    env.execute()
+
+    val expected = List(
+      "1,12,45.6",
+      "2,12,45.612",
+      "2,13,41.6")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testLeftUnnestMultiSetFromCollectResult(): Unit = {
+    val data = List(
+      (1, "1", "Hello"),
+      (1, "2", "Hello2"),
+      (2, "2", "Hello"),
+      (3, null.asInstanceOf[String], "Hello"),
+      (4, "4", "Hello"),
+      (5, "5", "Hello"),
+      (5, null.asInstanceOf[String], "Hello"),
+      (6, "6", "Hello"),
+      (7, "7", "Hello World"),
+      (7, "8", "Hello World"))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("t1", t1)
+
+    val t2 = tEnv.sqlQuery("SELECT a, COLLECT(b) as `set` FROM t1 GROUP BY a")
+    tEnv.registerTable("t2", t2)
+
+    val result = tEnv
+      .sqlQuery("SELECT a, s FROM t2 LEFT JOIN UNNEST(t2.`set`) AS A(s) ON TRUE WHERE a < 5")
+      .toRetractStream[Row]
+    result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+    env.execute()
+
+    val expected = List(
+      "1,1",
+      "1,2",
+      "2,2",
+      "3,null",
+      "4,4"
+    )
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
   def testHopStartEndWithHaving(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)