You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/05/17 12:12:01 UTC
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5619#discussion_r188906074
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala ---
@@ -18,74 +18,129 @@
package org.apache.flink.table.plan.util
+import java.util
+
import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MultisetTypeInfo, ObjectArrayTypeInfo}
import org.apache.flink.table.functions.TableFunction
+import scala.collection.JavaConverters._
+
class ObjectExplodeTableFunc extends TableFunction[Object] {
def eval(arr: Array[Object]): Unit = {
arr.foreach(collect)
}
+
+ def eval(map: util.Map[Object, Integer]): Unit = {
+ CommonCollect.collect(map, collect)
+ }
}
class FloatExplodeTableFunc extends TableFunction[Float] {
def eval(arr: Array[Float]): Unit = {
arr.foreach(collect)
}
+
+ def eval(map: util.Map[Float, Integer]): Unit = {
+ CommonCollect.collect(map, collect)
+ }
}
class ShortExplodeTableFunc extends TableFunction[Short] {
def eval(arr: Array[Short]): Unit = {
arr.foreach(collect)
}
+
+ def eval(map: util.Map[Short, Integer]): Unit = {
+ CommonCollect.collect(map, collect)
+ }
}
class IntExplodeTableFunc extends TableFunction[Int] {
def eval(arr: Array[Int]): Unit = {
arr.foreach(collect)
}
+
+ def eval(map: util.Map[Int, Integer]): Unit = {
+ CommonCollect.collect(map, collect)
+ }
}
class LongExplodeTableFunc extends TableFunction[Long] {
def eval(arr: Array[Long]): Unit = {
arr.foreach(collect)
}
+
+ def eval(map: util.Map[Long, Integer]): Unit = {
+ CommonCollect.collect(map, collect)
+ }
}
class DoubleExplodeTableFunc extends TableFunction[Double] {
def eval(arr: Array[Double]): Unit = {
arr.foreach(collect)
}
+
+ def eval(map: util.Map[Double, Integer]): Unit = {
+ CommonCollect.collect(map, collect)
+ }
}
class ByteExplodeTableFunc extends TableFunction[Byte] {
def eval(arr: Array[Byte]): Unit = {
arr.foreach(collect)
}
+
+ def eval(map: util.Map[Byte, Integer]): Unit = {
+ CommonCollect.collect(map, collect)
+ }
}
class BooleanExplodeTableFunc extends TableFunction[Boolean] {
def eval(arr: Array[Boolean]): Unit = {
arr.foreach(collect)
}
+
+ def eval(map: util.Map[Boolean, Integer]): Unit = {
+ CommonCollect.collect(map, collect)
+ }
+}
+
+object CommonCollect {
+ def collect[T](map: util.Map[T, Integer], collectFunc: (T) => Unit): Unit = {
+ map.asScala.foreach{ e =>
--- End diff --
We should not use any Scala magic in runtime code. Can you convert it to a plain `while` loop? Would be great if you could also convert the `foreach` loops in the other methods of this class.
---