You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/23 15:01:42 UTC
flink git commit: [FLINK-4550] [table] Clearly define SQL operator
table
Repository: flink
Updated Branches:
refs/heads/master 51a5048b2 -> ecbccd940
[FLINK-4550] [table] Clearly define SQL operator table
This closes #2502.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ecbccd94
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ecbccd94
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ecbccd94
Branch: refs/heads/master
Commit: ecbccd940d2df462215b7a79e895114b3d2df3cf
Parents: 51a5048
Author: twalthr <tw...@apache.org>
Authored: Thu Sep 15 16:56:16 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Fri Sep 23 16:47:59 2016 +0200
----------------------------------------------------------------------
.../flink/api/table/TableEnvironment.scala | 2 +-
.../api/table/validate/FunctionCatalog.scala | 123 +++++++++++++++++--
2 files changed, 116 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ecbccd94/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index d7e650c..b95198c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -63,7 +63,7 @@ abstract class TableEnvironment(val config: TableConfig) {
private val tables: SchemaPlus = Frameworks.createRootSchema(true)
// Table API/SQL function catalog
- private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuildIns
+ private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
// SQL operator and function catalog
private val sqlOperatorTable: SqlOperatorTable = functionCatalog.getSqlOperatorTable
http://git-wip-us.apache.org/repos/asf/flink/blob/ecbccd94/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index 42d460e..9c66730 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -19,8 +19,8 @@
package org.apache.flink.api.table.validate
import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable}
-import org.apache.calcite.sql.{SqlFunction, SqlOperatorTable}
+import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable}
+import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
import org.apache.flink.api.table.ValidationException
import org.apache.flink.api.table.expressions._
import org.apache.flink.api.table.functions.ScalarFunction
@@ -31,7 +31,8 @@ import scala.collection.mutable
import scala.util.{Failure, Success, Try}
/**
- * A catalog for looking up user-defined functions, used during validation phase.
+ * A catalog for looking up (user-defined) functions, used during validation phases
+ * of both Table API and SQL API.
*/
class FunctionCatalog {
@@ -48,7 +49,7 @@ class FunctionCatalog {
def getSqlOperatorTable: SqlOperatorTable =
ChainedSqlOperatorTable.of(
- SqlStdOperatorTable.instance(),
+ new BasicOperatorTable(),
new ListSqlOperatorTable(sqlFunctions)
)
@@ -116,7 +117,7 @@ class FunctionCatalog {
object FunctionCatalog {
- val buildInFunctions: Map[String, Class[_]] = Map(
+ val builtInFunctions: Map[String, Class[_]] = Map(
// logic
"isNull" -> classOf[IsNull],
"isNotNull" -> classOf[IsNotNull],
@@ -169,11 +170,117 @@ object FunctionCatalog {
)
/**
- * Create a new function catalog with build-in functions.
+ * Create a new function catalog with built-in functions.
*/
- def withBuildIns: FunctionCatalog = {
+ def withBuiltIns: FunctionCatalog = {
val catalog = new FunctionCatalog()
- buildInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) }
+ builtInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) }
catalog
}
}
+
+class BasicOperatorTable extends ReflectiveSqlOperatorTable {
+
+ /**
+ * List of supported SQL operators / functions.
+ *
+ * This list should be kept in sync with [[SqlStdOperatorTable]].
+ */
+ private val builtInSqlOperators: Seq[SqlOperator] = Seq(
+ // SET OPERATORS
+ SqlStdOperatorTable.UNION,
+ SqlStdOperatorTable.UNION_ALL,
+ SqlStdOperatorTable.EXCEPT,
+ SqlStdOperatorTable.EXCEPT_ALL,
+ SqlStdOperatorTable.INTERSECT,
+ SqlStdOperatorTable.INTERSECT_ALL,
+ // BINARY OPERATORS
+ SqlStdOperatorTable.AND,
+ SqlStdOperatorTable.AS,
+ SqlStdOperatorTable.CONCAT,
+ SqlStdOperatorTable.DIVIDE,
+ SqlStdOperatorTable.DIVIDE_INTEGER,
+ SqlStdOperatorTable.DOT,
+ SqlStdOperatorTable.EQUALS,
+ SqlStdOperatorTable.GREATER_THAN,
+ SqlStdOperatorTable.IS_DISTINCT_FROM,
+ SqlStdOperatorTable.IS_NOT_DISTINCT_FROM,
+ SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+ SqlStdOperatorTable.LESS_THAN,
+ SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+ SqlStdOperatorTable.MINUS,
+ SqlStdOperatorTable.MULTIPLY,
+ SqlStdOperatorTable.NOT_EQUALS,
+ SqlStdOperatorTable.OR,
+ SqlStdOperatorTable.PLUS,
+ SqlStdOperatorTable.DATETIME_PLUS,
+ // POSTFIX OPERATORS
+ SqlStdOperatorTable.DESC,
+ SqlStdOperatorTable.NULLS_FIRST,
+ SqlStdOperatorTable.IS_NOT_NULL,
+ SqlStdOperatorTable.IS_NULL,
+ SqlStdOperatorTable.IS_NOT_TRUE,
+ SqlStdOperatorTable.IS_TRUE,
+ SqlStdOperatorTable.IS_NOT_FALSE,
+ SqlStdOperatorTable.IS_FALSE,
+ SqlStdOperatorTable.IS_NOT_UNKNOWN,
+ SqlStdOperatorTable.IS_UNKNOWN,
+ // PREFIX OPERATORS
+ SqlStdOperatorTable.NOT,
+ SqlStdOperatorTable.UNARY_MINUS,
+ SqlStdOperatorTable.UNARY_PLUS,
+ // AGGREGATE OPERATORS
+ SqlStdOperatorTable.SUM,
+ SqlStdOperatorTable.COUNT,
+ SqlStdOperatorTable.MIN,
+ SqlStdOperatorTable.MAX,
+ SqlStdOperatorTable.AVG,
+ // SPECIAL OPERATORS
+ SqlStdOperatorTable.ROW,
+ SqlStdOperatorTable.OVERLAPS,
+ SqlStdOperatorTable.LITERAL_CHAIN,
+ SqlStdOperatorTable.BETWEEN,
+ SqlStdOperatorTable.SYMMETRIC_BETWEEN,
+ SqlStdOperatorTable.NOT_BETWEEN,
+ SqlStdOperatorTable.SYMMETRIC_NOT_BETWEEN,
+ SqlStdOperatorTable.NOT_LIKE,
+ SqlStdOperatorTable.LIKE,
+ SqlStdOperatorTable.NOT_SIMILAR_TO,
+ SqlStdOperatorTable.SIMILAR_TO,
+ SqlStdOperatorTable.CASE,
+ SqlStdOperatorTable.REINTERPRET,
+ SqlStdOperatorTable.EXTRACT_DATE,
+ // FUNCTIONS
+ SqlStdOperatorTable.SUBSTRING,
+ SqlStdOperatorTable.OVERLAY,
+ SqlStdOperatorTable.TRIM,
+ SqlStdOperatorTable.POSITION,
+ SqlStdOperatorTable.CHAR_LENGTH,
+ SqlStdOperatorTable.CHARACTER_LENGTH,
+ SqlStdOperatorTable.UPPER,
+ SqlStdOperatorTable.LOWER,
+ SqlStdOperatorTable.INITCAP,
+ SqlStdOperatorTable.POWER,
+ SqlStdOperatorTable.SQRT,
+ SqlStdOperatorTable.MOD,
+ SqlStdOperatorTable.LN,
+ SqlStdOperatorTable.LOG10,
+ SqlStdOperatorTable.ABS,
+ SqlStdOperatorTable.EXP,
+ SqlStdOperatorTable.NULLIF,
+ SqlStdOperatorTable.COALESCE,
+ SqlStdOperatorTable.FLOOR,
+ SqlStdOperatorTable.CEIL,
+ SqlStdOperatorTable.LOCALTIME,
+ SqlStdOperatorTable.LOCALTIMESTAMP,
+ SqlStdOperatorTable.CURRENT_TIME,
+ SqlStdOperatorTable.CURRENT_TIMESTAMP,
+ SqlStdOperatorTable.CURRENT_DATE,
+ SqlStdOperatorTable.CAST,
+ SqlStdOperatorTable.EXTRACT,
+ SqlStdOperatorTable.QUARTER,
+ SqlStdOperatorTable.SCALAR_QUERY
+ )
+
+ builtInSqlOperators.foreach(register)
+}