You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/01/05 15:57:50 UTC
flink git commit: [FLINK-4686] [table] Add possibility to get column
names
Repository: flink
Updated Branches:
refs/heads/master 33b8570e9 -> 6ac579455
[FLINK-4686] [table] Add possibility to get column names
This closes #2553.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ac57945
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ac57945
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ac57945
Branch: refs/heads/master
Commit: 6ac579455115d1062cade8a0453417c9a6ae6a83
Parents: 33b8570
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Tue Sep 27 17:05:27 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Thu Jan 5 16:53:27 2017 +0100
----------------------------------------------------------------------
.../apache/flink/table/api/TableSchema.scala | 100 +++++++++++++++++++
.../org/apache/flink/table/api/table.scala | 14 +++
.../apache/flink/table/TableSchemaTest.scala | 82 +++++++++++++++
3 files changed, 196 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6ac57945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
new file mode 100644
index 0000000..a67a07a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/**
+ * A TableSchema represents a Table's structure.
+ */
+class TableSchema(
+ private val columnNames: Array[String],
+ private val columnTypes: Array[TypeInformation[_]]) {
+
+ if (columnNames.length != columnTypes.length) {
+ throw new TableException(
+ "Number of column indexes and column names must be equal.")
+ }
+
+ // check uniqueness of field names
+ if (columnNames.toSet.size != columnTypes.length) {
+ throw new TableException(
+ "Table column names must be unique.")
+ }
+
+ val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap
+
+ /**
+ * Returns all type information as an array.
+ */
+ def getTypes: Array[TypeInformation[_]] = columnTypes
+
+ /**
+ * Returns the specified type information for the given column index.
+ *
+ * @param columnIndex the index of the field
+ */
+ def getType(columnIndex: Int): Option[TypeInformation[_]] = {
+ if (columnIndex < 0 || columnIndex >= columnNames.length) {
+ None
+ } else {
+ Some(columnTypes(columnIndex))
+ }
+ }
+
+ /**
+ * Returns the specified type information for the given column name.
+ *
+ * @param columnName the name of the field
+ */
+ def getType(columnName: String): Option[TypeInformation[_]] = {
+ if (columnNameToIndex.contains(columnName)) {
+ Some(columnTypes(columnNameToIndex(columnName)))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Returns all column names as an array.
+ */
+ def getColumnNames: Array[String] = columnNames
+
+ /**
+ * Returns the specified column name for the given column index.
+ *
+ * @param columnIndex the index of the field
+ */
+ def getColumnName(columnIndex: Int): Option[String] = {
+ if (columnIndex < 0 || columnIndex >= columnNames.length) {
+ None
+ } else {
+ Some(columnNames(columnIndex))
+ }
+ }
+
+ override def toString = {
+ val builder = new StringBuilder
+ builder.append("root\n")
+ columnNames.zip(columnTypes).foreach{ case (name, typeInfo) =>
+ builder.append(s" |-- $name: $typeInfo\n")
+ }
+ builder.toString()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ac57945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 6322026..8717431 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -63,11 +63,25 @@ class Table(
private[flink] val tableEnv: TableEnvironment,
private[flink] val logicalPlan: LogicalNode) {
+ private val tableSchema: TableSchema = new TableSchema(
+ logicalPlan.output.map(_.name).toArray,
+ logicalPlan.output.map(_.resultType).toArray)
+
def relBuilder = tableEnv.getRelBuilder
def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder)
/**
+ * Returns the schema of this table.
+ */
+ def getSchema: TableSchema = tableSchema
+
+ /**
+ * Prints the schema of this table to the console in a tree format.
+ */
+ def printSchema(): Unit = print(tableSchema.toString)
+
+ /**
* Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
* can contain complex expressions and aggregations.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/6ac57945/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala
new file mode 100644
index 0000000..39c4c16
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, TableSchema, Types}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.Test
+
+class TableSchemaTest extends TableTestBase {
+
+ @Test
+ def testBatchTableSchema(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Int, String)]("MyTable", 'a, 'b)
+ val schema = table.getSchema
+
+ assertEquals("a", schema.getColumnNames.apply(0))
+ assertEquals("b", schema.getColumnNames.apply(1))
+
+ assertEquals(Types.INT, schema.getTypes.apply(0))
+ assertEquals(Types.STRING, schema.getTypes.apply(1))
+
+ val expectedString = "root\n" +
+ " |-- a: Integer\n" +
+ " |-- b: String\n"
+ assertEquals(expectedString, schema.toString)
+
+ assertTrue(schema.getColumnName(3).isEmpty)
+ assertTrue(schema.getType(-1).isEmpty)
+ assertTrue(schema.getType("c").isEmpty)
+ }
+
+ @Test
+ def testStreamTableSchema(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, String)]("MyTable", 'a, 'b)
+ val schema = table.getSchema
+
+ assertEquals("a", schema.getColumnNames.apply(0))
+ assertEquals("b", schema.getColumnNames.apply(1))
+
+ assertEquals(Types.INT, schema.getTypes.apply(0))
+ assertEquals(Types.STRING, schema.getTypes.apply(1))
+
+ val expectedString = "root\n" +
+ " |-- a: Integer\n" +
+ " |-- b: String\n"
+ assertEquals(expectedString, schema.toString)
+
+ assertTrue(schema.getColumnName(3).isEmpty)
+ assertTrue(schema.getType(-1).isEmpty)
+ assertTrue(schema.getType("c").isEmpty)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidSchema() {
+ val fieldNames = Array("a", "b", "c")
+ val typeInfos: Array[TypeInformation[_]] = Array(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO)
+ new TableSchema(fieldNames, typeInfos)
+ }
+}