You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/06/17 08:36:52 UTC
[3/3] flink git commit: [FLINK-4077] [tableAPI] Register Pojo
DataSet/DataStream as Table with field references.
[FLINK-4077] [tableAPI] Register Pojo DataSet/DataStream as Table with field references.
This closes #2107
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efc344a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efc344a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efc344a4
Branch: refs/heads/master
Commit: efc344a4e2ef8ea3e0b1e4da621196e9afeb75cc
Parents: 298c009
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Jun 15 17:12:10 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jun 17 00:19:26 2016 +0200
----------------------------------------------------------------------
.../flink/api/table/TableEnvironment.scala | 19 +-
.../flink/api/table/TableEnvironmentTest.scala | 289 +++++++++++++++++++
2 files changed, 303 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/efc344a4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 7debb65..4d1bb1d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -124,8 +124,8 @@ abstract class TableEnvironment(val config: TableConfig) {
* We use this method to replace a [[org.apache.flink.api.table.plan.schema.DataStreamTable]]
* with a [[org.apache.calcite.schema.TranslatableTable]].
*
- * @param name
- * @param table
+ * @param name Name of the table to replace.
+ * @param table The table that replaces the previous table.
*/
protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
@@ -230,7 +230,9 @@ abstract class TableEnvironment(val config: TableConfig) {
* @tparam A The type of the TypeInformation.
* @return A tuple of two arrays holding the field names and corresponding field positions.
*/
- protected def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = {
+ protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
+ (Array[String], Array[Int]) =
+ {
val fieldNames: Array[String] = inputType match {
case t: TupleTypeInfo[A] => t.getFieldNames
case c: CaseClassTypeInfo[A] => c.getFieldNames
@@ -251,7 +253,7 @@ abstract class TableEnvironment(val config: TableConfig) {
* @tparam A The type of the TypeInformation.
* @return A tuple of two arrays holding the field names and corresponding field positions.
*/
- protected def getFieldInfo[A](
+ protected[flink] def getFieldInfo[A](
inputType: TypeInformation[A],
exprs: Array[Expression]): (Array[String], Array[Int]) = {
@@ -290,13 +292,20 @@ abstract class TableEnvironment(val config: TableConfig) {
}
case p: PojoTypeInfo[A] =>
exprs.map {
+ case (UnresolvedFieldReference(name)) =>
+ val idx = p.getFieldIndex(name)
+ if (idx < 0) {
+ throw new TableException(s"$name is not a field of type $p")
+ }
+ (idx, name)
case Alias(UnresolvedFieldReference(origName), name) =>
val idx = p.getFieldIndex(origName)
if (idx < 0) {
throw new TableException(s"$origName is not a field of type $p")
}
(idx, name)
- case _ => throw new TableException("Alias on field reference expression expected.")
+ case _ => throw new TableException(
+ "Field reference expression or alias on field expression expected.")
}
case tpe => throw new TableException(
s"Source of type $tpe cannot be converted into Table.")
http://git-wip-us.apache.org/repos/asf/flink/blob/efc344a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala
new file mode 100644
index 0000000..263696b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/TableEnvironmentTest.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{TypeExtractor, TupleTypeInfo}
+import org.apache.flink.api.table.expressions.{Alias, UnresolvedFieldReference}
+import org.apache.flink.api.table.sinks.TableSink
+import org.junit.Test
+import org.junit.Assert.assertEquals
+
+class TableEnvironmentTest {
+
+ val tEnv = new MockTableEnvironment
+
+ val tupleType = new TupleTypeInfo(
+ INT_TYPE_INFO,
+ STRING_TYPE_INFO,
+ DOUBLE_TYPE_INFO)
+
+ val caseClassType = implicitly[TypeInformation[CClass]]
+
+ val pojoType = TypeExtractor.createTypeInfo(classOf[PojoClass])
+
+ val atomicType = INT_TYPE_INFO
+
+ @Test
+ def testGetFieldInfoTuple(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(tupleType)
+
+ fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoCClass(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(caseClassType)
+
+ fieldInfo._1.zip(Array("cf1", "cf2", "cf3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoPojo(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(pojoType)
+
+ fieldInfo._1.zip(Array("pf1", "pf2", "pf3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoAtomic(): Unit = {
+ tEnv.getFieldInfo(atomicType)
+ }
+
+ @Test
+ def testGetFieldInfoTupleNames(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ tupleType,
+ Array(
+ new UnresolvedFieldReference("name1"),
+ new UnresolvedFieldReference("name2"),
+ new UnresolvedFieldReference("name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoCClassNames(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ caseClassType,
+ Array(
+ new UnresolvedFieldReference("name1"),
+ new UnresolvedFieldReference("name2"),
+ new UnresolvedFieldReference("name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoPojoNames1(): Unit = {
+ tEnv.getFieldInfo(
+ pojoType,
+ Array(
+ new UnresolvedFieldReference("name1"),
+ new UnresolvedFieldReference("name2"),
+ new UnresolvedFieldReference("name3")
+ ))
+ }
+
+ @Test
+ def testGetFieldInfoPojoNames2(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ pojoType,
+ Array(
+ new UnresolvedFieldReference("pf3"),
+ new UnresolvedFieldReference("pf1"),
+ new UnresolvedFieldReference("pf2")
+ ))
+
+ fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoAtomicName1(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ atomicType,
+ Array(new UnresolvedFieldReference("name"))
+ )
+
+ fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoAtomicName2(): Unit = {
+ tEnv.getFieldInfo(
+ atomicType,
+ Array(
+ new UnresolvedFieldReference("name1"),
+ new UnresolvedFieldReference("name2")
+ ))
+ }
+
+ @Test
+ def testGetFieldInfoTupleAlias1(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ tupleType,
+ Array(
+ new Alias(UnresolvedFieldReference("f0"), "name1"),
+ new Alias(UnresolvedFieldReference("f1"), "name2"),
+ new Alias(UnresolvedFieldReference("f2"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoTupleAlias2(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ tupleType,
+ Array(
+ new Alias(UnresolvedFieldReference("f2"), "name1"),
+ new Alias(UnresolvedFieldReference("f0"), "name2"),
+ new Alias(UnresolvedFieldReference("f1"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoTupleAlias3(): Unit = {
+ tEnv.getFieldInfo(
+ tupleType,
+ Array(
+ new Alias(UnresolvedFieldReference("xxx"), "name1"),
+ new Alias(UnresolvedFieldReference("yyy"), "name2"),
+ new Alias(UnresolvedFieldReference("zzz"), "name3")
+ ))
+ }
+
+ @Test
+ def testGetFieldInfoCClassAlias1(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ caseClassType,
+ Array(
+ new Alias(new UnresolvedFieldReference("cf1"), "name1"),
+ new Alias(new UnresolvedFieldReference("cf2"), "name2"),
+ new Alias(new UnresolvedFieldReference("cf3"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoCClassAlias2(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ caseClassType,
+ Array(
+ new Alias(new UnresolvedFieldReference("cf3"), "name1"),
+ new Alias(new UnresolvedFieldReference("cf1"), "name2"),
+ new Alias(new UnresolvedFieldReference("cf2"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoCClassAlias3(): Unit = {
+ tEnv.getFieldInfo(
+ caseClassType,
+ Array(
+ new Alias(new UnresolvedFieldReference("xxx"), "name1"),
+ new Alias(new UnresolvedFieldReference("yyy"), "name2"),
+ new Alias(new UnresolvedFieldReference("zzz"), "name3")
+ ))
+ }
+
+ @Test
+ def testGetFieldInfoPojoAlias1(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ pojoType,
+ Array(
+ new Alias(new UnresolvedFieldReference("pf1"), "name1"),
+ new Alias(new UnresolvedFieldReference("pf2"), "name2"),
+ new Alias(new UnresolvedFieldReference("pf3"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test
+ def testGetFieldInfoPojoAlias2(): Unit = {
+ val fieldInfo = tEnv.getFieldInfo(
+ pojoType,
+ Array(
+ new Alias(new UnresolvedFieldReference("pf3"), "name1"),
+ new Alias(new UnresolvedFieldReference("pf1"), "name2"),
+ new Alias(new UnresolvedFieldReference("pf2"), "name3")
+ ))
+
+ fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+ fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoPojoAlias3(): Unit = {
+ tEnv.getFieldInfo(
+ pojoType,
+ Array(
+ new Alias(new UnresolvedFieldReference("xxx"), "name1"),
+ new Alias(new UnresolvedFieldReference("yyy"), "name2"),
+ new Alias(new UnresolvedFieldReference("zzz"), "name3")
+ ))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testGetFieldInfoAtomicAlias(): Unit = {
+ tEnv.getFieldInfo(
+ atomicType,
+ Array(
+ new Alias(new UnresolvedFieldReference("name1"), "name2")
+ ))
+ }
+
+}
+
+class MockTableEnvironment extends TableEnvironment(new TableConfig) {
+
+ override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
+
+ override protected def checkValidTableName(name: String): Unit = ???
+
+ override def sql(query: String): Table = ???
+}
+
+case class CClass(cf1: Int, cf2: String, cf3: Double)
+
+class PojoClass(var pf1: Int, var pf2: String, var pf3: Double) {
+ def this() = this(0, "", 0.0)
+}