You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/01/05 15:57:50 UTC

flink git commit: [FLINK-4686] [table] Add possibility to get column names

Repository: flink
Updated Branches:
  refs/heads/master 33b8570e9 -> 6ac579455


[FLINK-4686] [table] Add possibility to get column names

This closes #2553.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ac57945
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ac57945
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ac57945

Branch: refs/heads/master
Commit: 6ac579455115d1062cade8a0453417c9a6ae6a83
Parents: 33b8570
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Tue Sep 27 17:05:27 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Thu Jan 5 16:53:27 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/table/api/TableSchema.scala    | 100 +++++++++++++++++++
 .../org/apache/flink/table/api/table.scala      |  14 +++
 .../apache/flink/table/TableSchemaTest.scala    |  82 +++++++++++++++
 3 files changed, 196 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ac57945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
new file mode 100644
index 0000000..a67a07a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/**
+  * A TableSchema represents a Table's structure.
+  */
+class TableSchema(
+  private val columnNames: Array[String],
+  private val columnTypes: Array[TypeInformation[_]]) {
+
+  if (columnNames.length != columnTypes.length) {
+    throw new TableException(
+      "Number of column indexes and column names must be equal.")
+  }
+
+  // check uniqueness of field names
+  if (columnNames.toSet.size != columnTypes.length) {
+    throw new TableException(
+      "Table column names must be unique.")
+  }
+
+  val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap
+
+  /**
+    * Returns all type information as an array.
+    */
+  def getTypes: Array[TypeInformation[_]] = columnTypes
+
+  /**
+    * Returns the specified type information for the given column index.
+    *
+    * @param columnIndex the index of the field
+    */
+  def getType(columnIndex: Int): Option[TypeInformation[_]] = {
+    if (columnIndex < 0 || columnIndex >= columnNames.length) {
+      None
+    } else {
+      Some(columnTypes(columnIndex))
+    }
+  }
+
+  /**
+    * Returns the specified type information for the given column name.
+    *
+    * @param columnName the name of the field
+    */
+  def getType(columnName: String): Option[TypeInformation[_]] = {
+    if (columnNameToIndex.contains(columnName)) {
+      Some(columnTypes(columnNameToIndex(columnName)))
+    } else {
+      None
+    }
+  }
+
+  /**
+    * Returns all column names as an array.
+    */
+  def getColumnNames: Array[String] = columnNames
+
+  /**
+    * Returns the specified column name for the given column index.
+    *
+    * @param columnIndex the index of the field
+    */
+  def getColumnName(columnIndex: Int): Option[String] = {
+    if (columnIndex < 0 || columnIndex >= columnNames.length) {
+      None
+    } else {
+      Some(columnNames(columnIndex))
+    }
+  }
+
+  override def toString = {
+    val builder = new StringBuilder
+    builder.append("root\n")
+    columnNames.zip(columnTypes).foreach{ case (name, typeInfo) =>
+      builder.append(s" |-- $name: $typeInfo\n")
+    }
+    builder.toString()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ac57945/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 6322026..8717431 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -63,11 +63,25 @@ class Table(
     private[flink] val tableEnv: TableEnvironment,
     private[flink] val logicalPlan: LogicalNode) {
 
+  private val tableSchema: TableSchema = new TableSchema(
+    logicalPlan.output.map(_.name).toArray,
+    logicalPlan.output.map(_.resultType).toArray)
+
   def relBuilder = tableEnv.getRelBuilder
 
   def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder)
 
   /**
+    * Returns the schema of this table.
+    */
+  def getSchema: TableSchema = tableSchema
+
+  /**
+    * Prints the schema of this table to the console in a tree format.
+    */
+  def printSchema(): Unit = print(tableSchema.toString)
+
+  /**
     * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
     * can contain complex expressions and aggregations.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/6ac57945/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala
new file mode 100644
index 0000000..39c4c16
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSchemaTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, TableSchema, Types}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.Test
+
+class TableSchemaTest extends TableTestBase {
+
+  @Test
+  def testBatchTableSchema(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, String)]("MyTable", 'a, 'b)
+    val schema = table.getSchema
+
+    assertEquals("a", schema.getColumnNames.apply(0))
+    assertEquals("b", schema.getColumnNames.apply(1))
+
+    assertEquals(Types.INT, schema.getTypes.apply(0))
+    assertEquals(Types.STRING, schema.getTypes.apply(1))
+
+    val expectedString = "root\n" +
+      " |-- a: Integer\n" +
+      " |-- b: String\n"
+    assertEquals(expectedString, schema.toString)
+
+    assertTrue(schema.getColumnName(3).isEmpty)
+    assertTrue(schema.getType(-1).isEmpty)
+    assertTrue(schema.getType("c").isEmpty)
+  }
+
+  @Test
+  def testStreamTableSchema(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, String)]("MyTable", 'a, 'b)
+    val schema = table.getSchema
+
+    assertEquals("a", schema.getColumnNames.apply(0))
+    assertEquals("b", schema.getColumnNames.apply(1))
+
+    assertEquals(Types.INT, schema.getTypes.apply(0))
+    assertEquals(Types.STRING, schema.getTypes.apply(1))
+
+    val expectedString = "root\n" +
+      " |-- a: Integer\n" +
+      " |-- b: String\n"
+    assertEquals(expectedString, schema.toString)
+
+    assertTrue(schema.getColumnName(3).isEmpty)
+    assertTrue(schema.getType(-1).isEmpty)
+    assertTrue(schema.getType("c").isEmpty)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidSchema() {
+    val fieldNames = Array("a", "b", "c")
+    val typeInfos: Array[TypeInformation[_]] = Array(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO)
+    new TableSchema(fieldNames, typeInfos)
+  }
+}