You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/23 15:01:42 UTC

flink git commit: [FLINK-4550] [table] Clearly define SQL operator table

Repository: flink
Updated Branches:
  refs/heads/master 51a5048b2 -> ecbccd940


[FLINK-4550] [table] Clearly define SQL operator table

This closes #2502.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ecbccd94
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ecbccd94
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ecbccd94

Branch: refs/heads/master
Commit: ecbccd940d2df462215b7a79e895114b3d2df3cf
Parents: 51a5048
Author: twalthr <tw...@apache.org>
Authored: Thu Sep 15 16:56:16 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Fri Sep 23 16:47:59 2016 +0200

----------------------------------------------------------------------
 .../flink/api/table/TableEnvironment.scala      |   2 +-
 .../api/table/validate/FunctionCatalog.scala    | 123 +++++++++++++++++--
 2 files changed, 116 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ecbccd94/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index d7e650c..b95198c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -63,7 +63,7 @@ abstract class TableEnvironment(val config: TableConfig) {
   private val tables: SchemaPlus = Frameworks.createRootSchema(true)
 
   // Table API/SQL function catalog
-  private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuildIns
+  private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
 
   // SQL operator and function catalog
   private val sqlOperatorTable: SqlOperatorTable = functionCatalog.getSqlOperatorTable

http://git-wip-us.apache.org/repos/asf/flink/blob/ecbccd94/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index 42d460e..9c66730 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.api.table.validate
 
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable}
-import org.apache.calcite.sql.{SqlFunction, SqlOperatorTable}
+import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable}
+import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
 import org.apache.flink.api.table.ValidationException
 import org.apache.flink.api.table.expressions._
 import org.apache.flink.api.table.functions.ScalarFunction
@@ -31,7 +31,8 @@ import scala.collection.mutable
 import scala.util.{Failure, Success, Try}
 
 /**
-  * A catalog for looking up user-defined functions, used during validation phase.
+  * A catalog for looking up (user-defined) functions, used during validation phases
+  * of both Table API and SQL API.
   */
 class FunctionCatalog {
 
@@ -48,7 +49,7 @@ class FunctionCatalog {
 
   def getSqlOperatorTable: SqlOperatorTable =
     ChainedSqlOperatorTable.of(
-      SqlStdOperatorTable.instance(),
+      new BasicOperatorTable(),
       new ListSqlOperatorTable(sqlFunctions)
     )
 
@@ -116,7 +117,7 @@ class FunctionCatalog {
 
 object FunctionCatalog {
 
-  val buildInFunctions: Map[String, Class[_]] = Map(
+  val builtInFunctions: Map[String, Class[_]] = Map(
     // logic
     "isNull" -> classOf[IsNull],
     "isNotNull" -> classOf[IsNotNull],
@@ -169,11 +170,117 @@ object FunctionCatalog {
   )
 
   /**
-    * Create a new function catalog with build-in functions.
+    * Create a new function catalog with built-in functions.
     */
-  def withBuildIns: FunctionCatalog = {
+  def withBuiltIns: FunctionCatalog = {
     val catalog = new FunctionCatalog()
-    buildInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) }
+    builtInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) }
     catalog
   }
 }
+
+class BasicOperatorTable extends ReflectiveSqlOperatorTable {
+
+  /**
+    * List of supported SQL operators / functions.
+    *
+    * This list should be kept in sync with [[SqlStdOperatorTable]].
+    */
+  private val builtInSqlOperators: Seq[SqlOperator] = Seq(
+    // SET OPERATORS
+    SqlStdOperatorTable.UNION,
+    SqlStdOperatorTable.UNION_ALL,
+    SqlStdOperatorTable.EXCEPT,
+    SqlStdOperatorTable.EXCEPT_ALL,
+    SqlStdOperatorTable.INTERSECT,
+    SqlStdOperatorTable.INTERSECT_ALL,
+    // BINARY OPERATORS
+    SqlStdOperatorTable.AND,
+    SqlStdOperatorTable.AS,
+    SqlStdOperatorTable.CONCAT,
+    SqlStdOperatorTable.DIVIDE,
+    SqlStdOperatorTable.DIVIDE_INTEGER,
+    SqlStdOperatorTable.DOT,
+    SqlStdOperatorTable.EQUALS,
+    SqlStdOperatorTable.GREATER_THAN,
+    SqlStdOperatorTable.IS_DISTINCT_FROM,
+    SqlStdOperatorTable.IS_NOT_DISTINCT_FROM,
+    SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+    SqlStdOperatorTable.LESS_THAN,
+    SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+    SqlStdOperatorTable.MINUS,
+    SqlStdOperatorTable.MULTIPLY,
+    SqlStdOperatorTable.NOT_EQUALS,
+    SqlStdOperatorTable.OR,
+    SqlStdOperatorTable.PLUS,
+    SqlStdOperatorTable.DATETIME_PLUS,
+    // POSTFIX OPERATORS
+    SqlStdOperatorTable.DESC,
+    SqlStdOperatorTable.NULLS_FIRST,
+    SqlStdOperatorTable.IS_NOT_NULL,
+    SqlStdOperatorTable.IS_NULL,
+    SqlStdOperatorTable.IS_NOT_TRUE,
+    SqlStdOperatorTable.IS_TRUE,
+    SqlStdOperatorTable.IS_NOT_FALSE,
+    SqlStdOperatorTable.IS_FALSE,
+    SqlStdOperatorTable.IS_NOT_UNKNOWN,
+    SqlStdOperatorTable.IS_UNKNOWN,
+    // PREFIX OPERATORS
+    SqlStdOperatorTable.NOT,
+    SqlStdOperatorTable.UNARY_MINUS,
+    SqlStdOperatorTable.UNARY_PLUS,
+    // AGGREGATE OPERATORS
+    SqlStdOperatorTable.SUM,
+    SqlStdOperatorTable.COUNT,
+    SqlStdOperatorTable.MIN,
+    SqlStdOperatorTable.MAX,
+    SqlStdOperatorTable.AVG,
+    // SPECIAL OPERATORS
+    SqlStdOperatorTable.ROW,
+    SqlStdOperatorTable.OVERLAPS,
+    SqlStdOperatorTable.LITERAL_CHAIN,
+    SqlStdOperatorTable.BETWEEN,
+    SqlStdOperatorTable.SYMMETRIC_BETWEEN,
+    SqlStdOperatorTable.NOT_BETWEEN,
+    SqlStdOperatorTable.SYMMETRIC_NOT_BETWEEN,
+    SqlStdOperatorTable.NOT_LIKE,
+    SqlStdOperatorTable.LIKE,
+    SqlStdOperatorTable.NOT_SIMILAR_TO,
+    SqlStdOperatorTable.SIMILAR_TO,
+    SqlStdOperatorTable.CASE,
+    SqlStdOperatorTable.REINTERPRET,
+    SqlStdOperatorTable.EXTRACT_DATE,
+    // FUNCTIONS
+    SqlStdOperatorTable.SUBSTRING,
+    SqlStdOperatorTable.OVERLAY,
+    SqlStdOperatorTable.TRIM,
+    SqlStdOperatorTable.POSITION,
+    SqlStdOperatorTable.CHAR_LENGTH,
+    SqlStdOperatorTable.CHARACTER_LENGTH,
+    SqlStdOperatorTable.UPPER,
+    SqlStdOperatorTable.LOWER,
+    SqlStdOperatorTable.INITCAP,
+    SqlStdOperatorTable.POWER,
+    SqlStdOperatorTable.SQRT,
+    SqlStdOperatorTable.MOD,
+    SqlStdOperatorTable.LN,
+    SqlStdOperatorTable.LOG10,
+    SqlStdOperatorTable.ABS,
+    SqlStdOperatorTable.EXP,
+    SqlStdOperatorTable.NULLIF,
+    SqlStdOperatorTable.COALESCE,
+    SqlStdOperatorTable.FLOOR,
+    SqlStdOperatorTable.CEIL,
+    SqlStdOperatorTable.LOCALTIME,
+    SqlStdOperatorTable.LOCALTIMESTAMP,
+    SqlStdOperatorTable.CURRENT_TIME,
+    SqlStdOperatorTable.CURRENT_TIMESTAMP,
+    SqlStdOperatorTable.CURRENT_DATE,
+    SqlStdOperatorTable.CAST,
+    SqlStdOperatorTable.EXTRACT,
+    SqlStdOperatorTable.QUARTER,
+    SqlStdOperatorTable.SCALAR_QUERY
+  )
+
+  builtInSqlOperators.foreach(register)
+}