You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/14 01:45:05 UTC
[flink] branch master updated: [FLINK-27376][table] Support current_database built-in function (#19218)
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 641fb893a3e [FLINK-27376][table] Support current_database built-in function (#19218)
641fb893a3e is described below
commit 641fb893a3e3bc7478413984c3287fcdb56ad845
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Thu Jul 14 09:44:54 2022 +0800
[FLINK-27376][table] Support current_database built-in function (#19218)
---
docs/data/sql_functions.yml | 5 ++++
docs/data/sql_functions_zh.yml | 5 ++++
.../apache/flink/table/module/hive/HiveModule.java | 1 +
.../connectors/hive/HiveDialectQueryITCase.java | 17 +++++++++++
.../flink/table/module/hive/HiveModuleTest.java | 4 +--
flink-python/pyflink/table/expressions.py | 12 ++++++--
.../pyflink/table/tests/test_expression.py | 5 ++--
.../org/apache/flink/table/api/Expressions.java | 8 +++++
.../table/api/ImplicitExpressionConversions.scala | 3 ++
.../functions/BuiltInFunctionDefinitions.java | 11 +++++++
.../expressions/converter/DirectConvertRule.java | 5 ++++
.../functions/sql/FlinkSqlOperatorTable.java | 10 +++++++
.../table/planner/utils/InternalConfigOptions.java | 8 +++++
.../planner/codegen/CodeGeneratorContext.scala | 25 ++++++++++++++++
.../planner/codegen/calls/StringCallGen.scala | 6 +++-
.../table/planner/delegation/PlannerBase.scala | 6 +++-
.../planner/runtime/batch/sql/CalcITCase.scala | 16 ++++++++++
.../planner/runtime/batch/table/CalcITCase.scala | 26 +++++++++++++++++
.../planner/runtime/stream/sql/CalcITCase.scala | 18 ++++++++++++
.../planner/runtime/stream/table/CalcITCase.scala | 34 ++++++++++++++++++++++
20 files changed, 217 insertions(+), 8 deletions(-)
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 24562c01fe3..c3d0f08a9ed 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -1002,3 +1002,8 @@ aggregate:
description: Returns the last value in an ordered set of values.
- sql: LISTAGG(expression [, separator])
description: Concatenates the values of string expressions and places separator values between them. The separator is not added at the end of string. The default value of separator is ','.
+
+catalog:
+ - sql: CURRENT_DATABASE()
+ table: currentDatabase()
+ description: Return the current database.
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index d7450ae3e82..2f689977248 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -1093,3 +1093,8 @@ aggregate:
description: 返回一组有序值中的最后一个值。
- sql: LISTAGG(expression [, separator])
description: 连接字符串表达式的值并在它们之间放置分隔符值。字符串末尾不添加分隔符时则分隔符的默认值为“,”。
+
+catalog:
+ - sql: CURRENT_DATABASE()
+ table: currentDatabase()
+ description: 返回当前数据库。
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index f83649efd29..17285c0876d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -51,6 +51,7 @@ public class HiveModule implements Module {
"cume_dist",
"current_date",
"current_timestamp",
+ "current_database",
"dense_rank",
"first_value",
"lag",
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index 1f1e846568e..af710f24730 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -361,6 +361,23 @@ public class HiveDialectQueryITCase {
}
}
+ @Test
+ public void testCurrentDatabase() {
+ List<Row> result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select current_database()").collect());
+ assertThat(result.toString()).isEqualTo("[+I[default]]");
+ tableEnv.executeSql("create database db1");
+ tableEnv.executeSql("use db1");
+ result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select current_database()").collect());
+ assertThat(result.toString()).isEqualTo("[+I[db1]]");
+ // switch to default database for following test use default database
+ tableEnv.executeSql("use default");
+ tableEnv.executeSql("drop database db1");
+ }
+
private void runQFile(File qfile) throws Exception {
QTest qTest = extractQTest(qfile);
for (int i = 0; i < qTest.statements.size(); i++) {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
index f7933d9dc5d..c8a9b6a4429 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
@@ -74,10 +74,10 @@ public class HiveModuleTest {
private void verifyNumBuiltInFunctions(String hiveVersion, HiveModule hiveModule) {
switch (hiveVersion) {
case HIVE_VERSION_V2_3_9:
- assertThat(hiveModule.listFunctions()).hasSize(275);
+ assertThat(hiveModule.listFunctions()).hasSize(274);
break;
case HIVE_VERSION_V3_1_1:
- assertThat(hiveModule.listFunctions()).hasSize(294);
+ assertThat(hiveModule.listFunctions()).hasSize(293);
break;
default:
fail("Unknown test version " + hiveVersion);
diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py
index d062aeecd0f..33114183746 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -25,8 +25,9 @@ from pyflink.table.udf import UserDefinedFunctionWrapper
from pyflink.util.java_utils import to_jarray, load_java_class
__all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 'not_', 'UNBOUNDED_ROW',
- 'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_date', 'current_time',
- 'current_timestamp', 'current_watermark', 'local_time', 'local_timestamp',
+ 'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_database',
+ 'current_date', 'current_time', 'current_timestamp',
+ 'current_watermark', 'local_time', 'local_timestamp',
'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 'row', 'map_',
'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 'negative', 'concat',
'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'json_string',
@@ -208,6 +209,13 @@ all rows with the same sort key as the current row are included in the window.
CURRENT_RANGE = Expression("CURRENT_RANGE") # type: Expression
+def current_database() -> Expression:
+ """
+ Returns the current database
+ """
+ return _leaf_op("currentDatabase")
+
+
def current_date() -> Expression:
"""
Returns the current SQL date in local time zone.
diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py
index c8e1bb2cdd4..4e2ab748365 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -21,8 +21,8 @@ from pyflink.table import DataTypes
from pyflink.table.expression import TimeIntervalUnit, TimePointUnit, JsonExistsOnError, \
JsonValueOnEmptyOrError, JsonType, JsonQueryWrapper, JsonQueryOnEmptyOrError
from pyflink.table.expressions import (col, lit, range_, and_, or_, current_date,
- current_time, current_timestamp, local_time,
- local_timestamp, temporal_overlaps, date_format,
+ current_time, current_timestamp, current_database,
+ local_timestamp, local_time, temporal_overlaps, date_format,
timestamp_diff, array, row, map_, row_interval, pi, e,
rand, rand_integer, atan2, negative, concat, concat_ws, uuid,
null_of, log, if_then_else, with_columns, call,
@@ -245,6 +245,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
self.assertEqual('unboundedRange()', str(UNBOUNDED_RANGE))
self.assertEqual('currentRow()', str(CURRENT_ROW))
self.assertEqual('currentRange()', str(CURRENT_RANGE))
+ self.assertEqual('currentDatabase()', str(current_database()))
self.assertEqual('currentDate()', str(current_date()))
self.assertEqual('currentTime()', str(current_time()))
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 8880a1693a4..29d30858fda 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -279,6 +279,14 @@ public final class Expressions {
return apiCall(BuiltInFunctionDefinitions.CURRENT_WATERMARK, rowtimeAttribute);
}
+ /**
+ * Return the current database, the return type of this expression is {@link
+ * DataTypes#STRING()}.
+ */
+ public static ApiExpression currentDatabase() {
+ return apiCall(BuiltInFunctionDefinitions.CURRENT_DATABASE);
+ }
+
/**
* Returns the current SQL time in local time zone, the return type of this expression is {@link
* DataTypes#TIME()}, this is a synonym for {@link Expressions#currentTime()}.
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index a1686d4b26e..90261590143 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -462,6 +462,9 @@ trait ImplicitExpressionConversions {
Expressions.currentWatermark(rowtimeAttribute)
}
+ /** Return the current database, the return type of this expression is [[DataTypes.STRING()]]. */
+ def currentDatabase(): Expression = Expressions.currentDatabase()
+
/**
* Returns the current SQL time in local time zone, the return type of this expression is
* [[DataTypes.TIME]], this is a synonym for [[ImplicitExpressionConversions.currentTime()]].
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 16acd6b8260..7f5810dc2f3 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1382,6 +1382,17 @@ public final class BuiltInFunctionDefinitions {
.outputTypeStrategy(nullableIfArgs(SpecificTypeStrategies.ROUND))
.build();
+ // --------------------------------------------------------------------------------------------
+ // Catalog functions
+ // --------------------------------------------------------------------------------------------
+
+ public static final BuiltInFunctionDefinition CURRENT_DATABASE =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("currentDatabase")
+ .kind(SCALAR)
+ .outputTypeStrategy(explicit(STRING().notNull()))
+ .build();
+
// --------------------------------------------------------------------------------------------
// Time functions
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
index 770a70227ca..85449ea2485 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -205,6 +205,11 @@ public class DirectConvertRule implements CallExpressionConvertRule {
BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ,
FlinkSqlOperatorTable.TO_TIMESTAMP_LTZ);
+ // catalog functions
+ DEFINITION_OPERATOR_MAP.put(
+ BuiltInFunctionDefinitions.CURRENT_DATABASE,
+ FlinkSqlOperatorTable.CURRENT_DATABASE);
+
// collection
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.AT, FlinkSqlOperatorTable.ITEM);
DEFINITION_OPERATOR_MAP.put(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 2b18ffee5e3..75001f4960f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -53,6 +53,7 @@ import java.util.List;
import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE;
import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.STR_MAP_NULLABLE;
import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.VARCHAR_FORCE_NULLABLE;
+import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.VARCHAR_NOT_NULL;
/** Operator table that contains only Flink-specific functions and operators. */
public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
@@ -1173,4 +1174,13 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlFunction TUMBLE = new SqlTumbleTableFunction();
public static final SqlFunction HOP = new SqlHopTableFunction();
public static final SqlFunction CUMULATE = new SqlCumulateTableFunction();
+
+ // Catalog Functions
+ public static final SqlFunction CURRENT_DATABASE =
+ BuiltInSqlFunction.newBuilder()
+ .name("CURRENT_DATABASE")
+ .returnType(VARCHAR_NOT_NULL)
+ .operandTypeChecker(OperandTypes.NILADIC)
+ .notDeterministic()
+ .build();
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java
index d1a60ff444e..ab3a61a1da3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java
@@ -52,6 +52,14 @@ public final class InternalConfigOptions {
+ " some temporal functions like LOCAL_TIMESTAMP in batch job to make sure these"
+ " temporal functions has query-start semantics.");
+ public static final ConfigOption<String> TABLE_QUERY_CURRENT_DATABASE =
+ key("__table.query-start.current-database__")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The config used to save the current database at query start."
+ + " Currently, it's only used for the function CURRENT_DATABASE.");
+
@Experimental
public static final ConfigOption<Boolean> TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED =
key("__table.exec.sort.non-temporal.enabled__")
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index f6f54ac5254..390419c6227 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -547,6 +547,31 @@ class CodeGeneratorContext(val tableConfig: ReadableConfig, val classLoader: Cla
fieldTerm
}
+ /**
+ * Adds a reusable query-level current database to the beginning of the SAM of the generated
+ * class.
+ *
+ * <p> The current database value is evaluated once at query-start.
+ */
+ def addReusableQueryLevelCurrentDatabase(): String = {
+ val fieldTerm = s"queryCurrentDatabase"
+
+ val queryStartCurrentDatabase = tableConfig
+ .getOptional(InternalConfigOptions.TABLE_QUERY_CURRENT_DATABASE)
+ .orElseThrow(new JSupplier[Throwable] {
+ override def get() = new CodeGenException(
+ "Try to obtain current database of query-start fail." +
+ " This is a bug, please file an issue.")
+ })
+
+ reusableMemberStatements.add(s"""
+ |private static final $BINARY_STRING $fieldTerm =
+ |$BINARY_STRING.fromString("$queryStartCurrentDatabase");
+ |""".stripMargin)
+
+ fieldTerm
+ }
+
/** Adds a reusable record-level local time to the beginning of the SAM of the generated class. */
def addReusableRecordLevelLocalTime(): String = {
val fieldTerm = s"localTime"
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index 8a052c0ac53..a589a495dee 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -21,7 +21,7 @@ import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.data.util.DataFormatConverters
import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression}
import org.apache.flink.table.planner.codegen.CodeGenUtils._
-import org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull, generateCallIfArgsNullable, generateStringResultCallIfArgsNotNull}
+import org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull, generateCallIfArgsNullable, generateNonNullField, generateStringResultCallIfArgsNotNull}
import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._
import org.apache.flink.table.runtime.functions.SqlFunctionUtils
@@ -233,6 +233,10 @@ object StringCallGen {
isCharacterString(operands(2).resultType) =>
methodGen(BuiltInMethods.CONVERT_TZ)
+ case CURRENT_DATABASE =>
+ val currentDatabase = ctx.addReusableQueryLevelCurrentDatabase()
+ generateNonNullField(returnType, currentDatabase)
+
case _ => null
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 401e6bf32da..c5cc1525a4d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -50,7 +50,7 @@ import org.apache.flink.table.planner.plan.reuse.SubplanReuser
import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle
import org.apache.flink.table.planner.sinks.DataStreamTableSink
import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateTableSink}
-import org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_START_EPOCH_TIME, TABLE_QUERY_START_LOCAL_TIME}
+import org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_CURRENT_DATABASE, TABLE_QUERY_START_EPOCH_TIME, TABLE_QUERY_START_LOCAL_TIME}
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toScala}
import org.apache.flink.table.planner.utils.TableConfigUtils
import org.apache.flink.table.runtime.generated.CompileUtils
@@ -450,6 +450,9 @@ abstract class PlannerBase(
TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig)).getOffset(epochTime)
tableConfig.set(TABLE_QUERY_START_LOCAL_TIME, localTime)
+ val currentDatabase = catalogManager.getCurrentDatabase
+ tableConfig.set(TABLE_QUERY_CURRENT_DATABASE, currentDatabase)
+
// We pass only the configuration to avoid reconfiguration with the rootConfiguration
getExecEnv.configure(tableConfig.getConfiguration, Thread.currentThread().getContextClassLoader)
@@ -466,6 +469,7 @@ abstract class PlannerBase(
val configuration = tableConfig.getConfiguration
configuration.removeConfig(TABLE_QUERY_START_EPOCH_TIME)
configuration.removeConfig(TABLE_QUERY_START_LOCAL_TIME)
+ configuration.removeConfig(TABLE_QUERY_CURRENT_DATABASE)
// Clean caches that might have filled up during optimization
CompileUtils.cleanUp()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index dd5a3ca4dbe..d895c028f58 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api.{DataTypes, TableSchema, ValidationException}
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour
+import org.apache.flink.table.catalog.CatalogDatabaseImpl
import org.apache.flink.table.data.{DecimalDataUtils, TimestampData}
import org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter
import org.apache.flink.table.planner.expressions.utils.{RichFunc1, RichFunc2, RichFunc3, SplitUDF}
@@ -2098,4 +2099,19 @@ class CalcITCase extends BatchTestBase {
Seq(row(1, 1, 2, 1, 3, 4, 1, 1, 2, 1, 3, 4, 1.0, 1.0, 2.0, 2.0, 2.0, null))
)
}
+
+ @Test
+ def testCurrentDatabase(): Unit = {
+ checkResult("SELECT CURRENT_DATABASE()", Seq(row(tEnv.getCurrentDatabase)))
+ // switch to another database
+ tEnv
+ .getCatalog(tEnv.getCurrentCatalog)
+ .get()
+ .createDatabase(
+ "db1",
+ new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"),
+ false)
+ tEnv.useDatabase("db1")
+ checkResult("SELECT CURRENT_DATABASE()", Seq(row(tEnv.getCurrentDatabase)))
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
index d0aa026c41c..83b741867b0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.batch.table
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.DataTypes._
+import org.apache.flink.table.catalog.CatalogDatabaseImpl
import org.apache.flink.table.data.DecimalDataUtils
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.expressions.utils._
@@ -640,6 +641,31 @@ class CalcITCase extends BatchTestBase {
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+ @Test
+ def testCurrentDatabase(): Unit = {
+ val result1 = executeQuery(
+ tEnv
+ .from("Table3")
+ .limit(1)
+ .select(currentDatabase()))
+ TestBaseUtils.compareResultAsText(result1.asJava, "default_database")
+
+ // switch to another database
+ tEnv
+ .getCatalog(tEnv.getCurrentCatalog)
+ .get()
+ .createDatabase(
+ "db1",
+ new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"),
+ false)
+ tEnv.useDatabase("db1")
+ val result2 = executeQuery(
+ tEnv
+ .from("default_database.Table3")
+ .limit(1)
+ .select(currentDatabase()))
+ TestBaseUtils.compareResultAsText(result1.asJava, "default_database")
+ }
}
@SerialVersionUID(1L)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 3126c4520de..1381851eb40 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour
import org.apache.flink.table.api.internal.TableEnvironmentInternal
+import org.apache.flink.table.catalog.CatalogDatabaseImpl
import org.apache.flink.table.data.{GenericRowData, MapData, RowData}
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils._
@@ -696,4 +697,21 @@ class CalcITCase extends StreamingTestBase {
TestBaseUtils.compareResultAsText(result, "1,1,2,1,3,4,1,1,2,1,3,4,1.0,1.0,2.0,2.0,2.0,null")
}
+ @Test
+ def testCurrentDatabase(): Unit = {
+ val result1 = tEnv.sqlQuery("SELECT CURRENT_DATABASE()").execute().collect().toList
+ assertEquals(Seq(row(tEnv.getCurrentDatabase)), result1)
+
+ // switch to another database
+ tEnv
+ .getCatalog(tEnv.getCurrentCatalog)
+ .get()
+ .createDatabase(
+ "db1",
+ new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"),
+ false)
+ tEnv.useDatabase("db1")
+ val result2 = tEnv.sqlQuery("SELECT CURRENT_DATABASE()").execute().collect().toList
+ assertEquals(Seq(row(tEnv.getCurrentDatabase)), result2)
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
index 0ddc5b39372..4d1141e36f2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
@@ -21,6 +21,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.annotation.{DataTypeHint, InputGroup}
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.catalog.CatalogDatabaseImpl
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.expressions.utils._
import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink, TestingRetractSink, UserDefinedFunctionTestUtils}
@@ -657,6 +658,39 @@ class CalcITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
val expected = List("0,0,0", "1,1,1", "2,2,2")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
+
+ @Test
+ def testCurrentDatabase(): Unit = {
+ val result1 =
+ env
+ .fromCollection(tupleData3)
+ .toTable(tEnv)
+ .limit(1)
+ .select(currentDatabase())
+ val sink1 = new TestingAppendSink
+ result1.toAppendStream[Row].addSink(sink1)
+ env.execute()
+ assertEquals(List(tEnv.getCurrentDatabase), sink1.getAppendResults.sorted)
+
+ // switch to another database
+ tEnv
+ .getCatalog(tEnv.getCurrentCatalog)
+ .get()
+ .createDatabase(
+ "db1",
+ new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"),
+ false)
+ tEnv.useDatabase("db1")
+ val result2 = env
+ .fromCollection(tupleData3)
+ .toTable(tEnv)
+ .limit(1)
+ .select(currentDatabase())
+ val sink2 = new TestingAppendSink
+ result2.toAppendStream[Row].addSink(sink2)
+ env.execute()
+ assertEquals(List(tEnv.getCurrentDatabase), sink2.getAppendResults.sorted)
+ }
}
@SerialVersionUID(1L)