You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/06/17 08:36:52 UTC

[3/3] flink git commit: [FLINK-4077] [tableAPI] Register Pojo DataSet/DataStream as Table with field references.

[FLINK-4077] [tableAPI] Register Pojo DataSet/DataStream as Table with field references.

This closes #2107


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efc344a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efc344a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efc344a4

Branch: refs/heads/master
Commit: efc344a4e2ef8ea3e0b1e4da621196e9afeb75cc
Parents: 298c009
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Jun 15 17:12:10 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jun 17 00:19:26 2016 +0200

----------------------------------------------------------------------
 .../flink/api/table/TableEnvironment.scala      |  19 +-
 .../flink/api/table/TableEnvironmentTest.scala  | 289 +++++++++++++++++++
 2 files changed, 303 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efc344a4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 7debb65..4d1bb1d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -124,8 +124,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     * We use this method to replace a [[org.apache.flink.api.table.plan.schema.DataStreamTable]]
     * with a [[org.apache.calcite.schema.TranslatableTable]].
     *
-    * @param name
-    * @param table
+    * @param name Name of the table to replace.
+    * @param table The table that replaces the previous table.
     */
   protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
 
@@ -230,7 +230,9 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @tparam A The type of the TypeInformation.
     * @return A tuple of two arrays holding the field names and corresponding field positions.
     */
-  protected def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
+  protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
+      (Array[String], Array[Int]) =
+  {
     val fieldNames: Array[String] = inputType match {
       case t: TupleTypeInfo[A] => t.getFieldNames
       case c: CaseClassTypeInfo[A] => c.getFieldNames
@@ -251,7 +253,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @tparam A The type of the TypeInformation.
     * @return A tuple of two arrays holding the field names and corresponding field positions.
     */
-  protected def getFieldInfo[A](
+  protected[flink] def getFieldInfo[A](
     inputType: TypeInformation[A],
     exprs: Array[Expression]): (Array[String], Array[Int]) = {
 
@@ -290,13 +292,20 @@ abstract class TableEnvironment(val config: TableConfig) {
         }
       case p: PojoTypeInfo[A] =>
         exprs.map {
+          case (UnresolvedFieldReference(name)) =>
+            val idx = p.getFieldIndex(name)
+            if (idx < 0) {
+              throw new TableException(s"$name is not a field of type $p")
+            }
+            (idx, name)
           case Alias(UnresolvedFieldReference(origName), name) =>
             val idx = p.getFieldIndex(origName)
             if (idx < 0) {
               throw new TableException(s"$origName is not a field of type $p")
             }
             (idx, name)
-          case _ => throw new TableException("Alias on field reference expression expected.")
+          case _ => throw new TableException(
+            "Field reference expression or alias on field expression expected.")
         }
       case tpe => throw new TableException(
         s"Source of type $tpe cannot be converted into Table.")

http://git-wip-us.apache.org/repos/asf/flink/blob/efc344a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala
new file mode 100644
index 0000000..263696b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{TypeExtractor, TupleTypeInfo}
+import org.apache.flink.api.table.expressions.{Alias, UnresolvedFieldReference}
+import org.apache.flink.api.table.sinks.TableSink
+import org.junit.Test
+import org.junit.Assert.assertEquals
+
+class TableEnvironmentTest {
+
+  val tEnv = new MockTableEnvironment
+
+  val tupleType = new TupleTypeInfo(
+    INT_TYPE_INFO,
+    STRING_TYPE_INFO,
+    DOUBLE_TYPE_INFO)
+
+  val caseClassType = implicitly[TypeInformation[CClass]]
+
+  val pojoType = TypeExtractor.createTypeInfo(classOf[PojoClass])
+
+  val atomicType = INT_TYPE_INFO
+
+  @Test
+  def testGetFieldInfoTuple(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(tupleType)
+
+    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoCClass(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(caseClassType)
+
+    fieldInfo._1.zip(Array("cf1", "cf2", "cf3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoPojo(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(pojoType)
+
+    fieldInfo._1.zip(Array("pf1", "pf2", "pf3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoAtomic(): Unit = {
+    tEnv.getFieldInfo(atomicType)
+  }
+
+  @Test
+  def testGetFieldInfoTupleNames(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      tupleType,
+      Array(
+        new UnresolvedFieldReference("name1"),
+        new UnresolvedFieldReference("name2"),
+        new UnresolvedFieldReference("name3")
+    ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoCClassNames(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      caseClassType,
+      Array(
+        new UnresolvedFieldReference("name1"),
+        new UnresolvedFieldReference("name2"),
+        new UnresolvedFieldReference("name3")
+    ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoPojoNames1(): Unit = {
+    tEnv.getFieldInfo(
+      pojoType,
+      Array(
+        new UnresolvedFieldReference("name1"),
+        new UnresolvedFieldReference("name2"),
+        new UnresolvedFieldReference("name3")
+      ))
+  }
+
+  @Test
+  def testGetFieldInfoPojoNames2(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      pojoType,
+      Array(
+        new UnresolvedFieldReference("pf3"),
+        new UnresolvedFieldReference("pf1"),
+        new UnresolvedFieldReference("pf2")
+      ))
+
+    fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoAtomicName1(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      atomicType,
+      Array(new UnresolvedFieldReference("name"))
+    )
+
+    fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoAtomicName2(): Unit = {
+    tEnv.getFieldInfo(
+      atomicType,
+      Array(
+        new UnresolvedFieldReference("name1"),
+        new UnresolvedFieldReference("name2")
+      ))
+  }
+
+  @Test
+  def testGetFieldInfoTupleAlias1(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      tupleType,
+      Array(
+        new Alias(UnresolvedFieldReference("f0"), "name1"),
+        new Alias(UnresolvedFieldReference("f1"), "name2"),
+        new Alias(UnresolvedFieldReference("f2"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoTupleAlias2(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      tupleType,
+      Array(
+        new Alias(UnresolvedFieldReference("f2"), "name1"),
+        new Alias(UnresolvedFieldReference("f0"), "name2"),
+        new Alias(UnresolvedFieldReference("f1"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoTupleAlias3(): Unit = {
+    tEnv.getFieldInfo(
+      tupleType,
+      Array(
+        new Alias(UnresolvedFieldReference("xxx"), "name1"),
+        new Alias(UnresolvedFieldReference("yyy"), "name2"),
+        new Alias(UnresolvedFieldReference("zzz"), "name3")
+      ))
+  }
+
+  @Test
+  def testGetFieldInfoCClassAlias1(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      caseClassType,
+      Array(
+        new Alias(new UnresolvedFieldReference("cf1"), "name1"),
+        new Alias(new UnresolvedFieldReference("cf2"), "name2"),
+        new Alias(new UnresolvedFieldReference("cf3"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoCClassAlias2(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      caseClassType,
+      Array(
+        new Alias(new UnresolvedFieldReference("cf3"), "name1"),
+        new Alias(new UnresolvedFieldReference("cf1"), "name2"),
+        new Alias(new UnresolvedFieldReference("cf2"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoCClassAlias3(): Unit = {
+    tEnv.getFieldInfo(
+      caseClassType,
+      Array(
+        new Alias(new UnresolvedFieldReference("xxx"), "name1"),
+        new Alias(new UnresolvedFieldReference("yyy"), "name2"),
+        new Alias(new UnresolvedFieldReference("zzz"), "name3")
+      ))
+  }
+
+  @Test
+  def testGetFieldInfoPojoAlias1(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      pojoType,
+      Array(
+        new Alias(new UnresolvedFieldReference("pf1"), "name1"),
+        new Alias(new UnresolvedFieldReference("pf2"), "name2"),
+        new Alias(new UnresolvedFieldReference("pf3"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoPojoAlias2(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      pojoType,
+      Array(
+        new Alias(new UnresolvedFieldReference("pf3"), "name1"),
+        new Alias(new UnresolvedFieldReference("pf1"), "name2"),
+        new Alias(new UnresolvedFieldReference("pf2"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoPojoAlias3(): Unit = {
+    tEnv.getFieldInfo(
+      pojoType,
+      Array(
+        new Alias(new UnresolvedFieldReference("xxx"), "name1"),
+        new Alias(new UnresolvedFieldReference("yyy"), "name2"),
+        new Alias(new UnresolvedFieldReference("zzz"), "name3")
+      ))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoAtomicAlias(): Unit = {
+    tEnv.getFieldInfo(
+      atomicType,
+      Array(
+        new Alias(new UnresolvedFieldReference("name1"), "name2")
+      ))
+  }
+
+}
+
+class MockTableEnvironment extends TableEnvironment(new TableConfig) {
+
+  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
+
+  override protected def checkValidTableName(name: String): Unit = ???
+
+  override def sql(query: String): Table = ???
+}
+
+case class CClass(cf1: Int, cf2: String, cf3: Double)
+
+class PojoClass(var pf1: Int, var pf2: String, var pf3: Double) {
+  def this() = this(0, "", 0.0)
+}