You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/14 01:45:05 UTC

[flink] branch master updated: [FLINK-27376][table] Support current_database built-in function (#19218)

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 641fb893a3e [FLINK-27376][table] Support current_database built-in function (#19218)
641fb893a3e is described below

commit 641fb893a3e3bc7478413984c3287fcdb56ad845
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Thu Jul 14 09:44:54 2022 +0800

    [FLINK-27376][table] Support current_database built-in function (#19218)
---
 docs/data/sql_functions.yml                        |  5 ++++
 docs/data/sql_functions_zh.yml                     |  5 ++++
 .../apache/flink/table/module/hive/HiveModule.java |  1 +
 .../connectors/hive/HiveDialectQueryITCase.java    | 17 +++++++++++
 .../flink/table/module/hive/HiveModuleTest.java    |  4 +--
 flink-python/pyflink/table/expressions.py          | 12 ++++++--
 .../pyflink/table/tests/test_expression.py         |  5 ++--
 .../org/apache/flink/table/api/Expressions.java    |  8 +++++
 .../table/api/ImplicitExpressionConversions.scala  |  3 ++
 .../functions/BuiltInFunctionDefinitions.java      | 11 +++++++
 .../expressions/converter/DirectConvertRule.java   |  5 ++++
 .../functions/sql/FlinkSqlOperatorTable.java       | 10 +++++++
 .../table/planner/utils/InternalConfigOptions.java |  8 +++++
 .../planner/codegen/CodeGeneratorContext.scala     | 25 ++++++++++++++++
 .../planner/codegen/calls/StringCallGen.scala      |  6 +++-
 .../table/planner/delegation/PlannerBase.scala     |  6 +++-
 .../planner/runtime/batch/sql/CalcITCase.scala     | 16 ++++++++++
 .../planner/runtime/batch/table/CalcITCase.scala   | 26 +++++++++++++++++
 .../planner/runtime/stream/sql/CalcITCase.scala    | 18 ++++++++++++
 .../planner/runtime/stream/table/CalcITCase.scala  | 34 ++++++++++++++++++++++
 20 files changed, 217 insertions(+), 8 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 24562c01fe3..c3d0f08a9ed 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -1002,3 +1002,8 @@ aggregate:
     description: Returns the last value in an ordered set of values.
   - sql: LISTAGG(expression [, separator])
     description: Concatenates the values of string expressions and places separator values between them. The separator is not added at the end of string. The default value of separator is ','.
+
+catalog:
+  - sql: CURRENT_DATABASE()
+    table: currentDatabase()
+    description: Return the current database.
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index d7450ae3e82..2f689977248 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -1093,3 +1093,8 @@ aggregate:
     description: 返回一组有序值中的最后一个值。
   - sql: LISTAGG(expression [, separator])
     description: 连接字符串表达式的值并在它们之间放置分隔符值。字符串末尾不添加分隔符时则分隔符的默认值为“,”。
+
+catalog:
+  - sql: CURRENT_DATABASE()
+    table: currentDatabase()
+    description: 返回当前数据库。
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index f83649efd29..17285c0876d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -51,6 +51,7 @@ public class HiveModule implements Module {
                                     "cume_dist",
                                     "current_date",
                                     "current_timestamp",
+                                    "current_database",
                                     "dense_rank",
                                     "first_value",
                                     "lag",
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index 1f1e846568e..af710f24730 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -361,6 +361,23 @@ public class HiveDialectQueryITCase {
         }
     }
 
+    @Test
+    public void testCurrentDatabase() {
+        List<Row> result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select current_database()").collect());
+        assertThat(result.toString()).isEqualTo("[+I[default]]");
+        tableEnv.executeSql("create database db1");
+        tableEnv.executeSql("use db1");
+        result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select current_database()").collect());
+        assertThat(result.toString()).isEqualTo("[+I[db1]]");
+        // switch to default database for following test use default database
+        tableEnv.executeSql("use default");
+        tableEnv.executeSql("drop database db1");
+    }
+
     private void runQFile(File qfile) throws Exception {
         QTest qTest = extractQTest(qfile);
         for (int i = 0; i < qTest.statements.size(); i++) {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
index f7933d9dc5d..c8a9b6a4429 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
@@ -74,10 +74,10 @@ public class HiveModuleTest {
     private void verifyNumBuiltInFunctions(String hiveVersion, HiveModule hiveModule) {
         switch (hiveVersion) {
             case HIVE_VERSION_V2_3_9:
-                assertThat(hiveModule.listFunctions()).hasSize(275);
+                assertThat(hiveModule.listFunctions()).hasSize(274);
                 break;
             case HIVE_VERSION_V3_1_1:
-                assertThat(hiveModule.listFunctions()).hasSize(294);
+                assertThat(hiveModule.listFunctions()).hasSize(293);
                 break;
             default:
                 fail("Unknown test version " + hiveVersion);
diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py
index d062aeecd0f..33114183746 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -25,8 +25,9 @@ from pyflink.table.udf import UserDefinedFunctionWrapper
 from pyflink.util.java_utils import to_jarray, load_java_class
 
 __all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 'not_', 'UNBOUNDED_ROW',
-           'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_date', 'current_time',
-           'current_timestamp', 'current_watermark', 'local_time', 'local_timestamp',
+           'UNBOUNDED_RANGE', 'CURRENT_ROW', 'CURRENT_RANGE', 'current_database',
+           'current_date', 'current_time', 'current_timestamp',
+           'current_watermark', 'local_time', 'local_timestamp',
            'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 'row', 'map_',
            'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 'negative', 'concat',
            'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'json_string',
@@ -208,6 +209,13 @@ all rows with the same sort key as the current row are included in the window.
 CURRENT_RANGE = Expression("CURRENT_RANGE")  # type: Expression
 
 
+def current_database() -> Expression:
+    """
+    Returns the current database
+    """
+    return _leaf_op("currentDatabase")
+
+
 def current_date() -> Expression:
     """
     Returns the current SQL date in local time zone.
diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py
index c8e1bb2cdd4..4e2ab748365 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -21,8 +21,8 @@ from pyflink.table import DataTypes
 from pyflink.table.expression import TimeIntervalUnit, TimePointUnit, JsonExistsOnError, \
     JsonValueOnEmptyOrError, JsonType, JsonQueryWrapper, JsonQueryOnEmptyOrError
 from pyflink.table.expressions import (col, lit, range_, and_, or_, current_date,
-                                       current_time, current_timestamp, local_time,
-                                       local_timestamp, temporal_overlaps, date_format,
+                                       current_time, current_timestamp, current_database,
+                                       local_timestamp, local_time, temporal_overlaps, date_format,
                                        timestamp_diff, array, row, map_, row_interval, pi, e,
                                        rand, rand_integer, atan2, negative, concat, concat_ws, uuid,
                                        null_of, log, if_then_else, with_columns, call,
@@ -245,6 +245,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual('unboundedRange()', str(UNBOUNDED_RANGE))
         self.assertEqual('currentRow()', str(CURRENT_ROW))
         self.assertEqual('currentRange()', str(CURRENT_RANGE))
+        self.assertEqual('currentDatabase()', str(current_database()))
 
         self.assertEqual('currentDate()', str(current_date()))
         self.assertEqual('currentTime()', str(current_time()))
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 8880a1693a4..29d30858fda 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -279,6 +279,14 @@ public final class Expressions {
         return apiCall(BuiltInFunctionDefinitions.CURRENT_WATERMARK, rowtimeAttribute);
     }
 
+    /**
+     * Return the current database, the return type of this expression is {@link
+     * DataTypes#STRING()}.
+     */
+    public static ApiExpression currentDatabase() {
+        return apiCall(BuiltInFunctionDefinitions.CURRENT_DATABASE);
+    }
+
     /**
      * Returns the current SQL time in local time zone, the return type of this expression is {@link
      * DataTypes#TIME()}, this is a synonym for {@link Expressions#currentTime()}.
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index a1686d4b26e..90261590143 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -462,6 +462,9 @@ trait ImplicitExpressionConversions {
     Expressions.currentWatermark(rowtimeAttribute)
   }
 
+  /** Return the current database, the return type of this expression is [[DataTypes.STRING()]]. */
+  def currentDatabase(): Expression = Expressions.currentDatabase()
+
   /**
    * Returns the current SQL time in local time zone, the return type of this expression is
    * [[DataTypes.TIME]], this is a synonym for [[ImplicitExpressionConversions.currentTime()]].
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 16acd6b8260..7f5810dc2f3 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1382,6 +1382,17 @@ public final class BuiltInFunctionDefinitions {
                     .outputTypeStrategy(nullableIfArgs(SpecificTypeStrategies.ROUND))
                     .build();
 
+    // --------------------------------------------------------------------------------------------
+    // Catalog functions
+    // --------------------------------------------------------------------------------------------
+
+    public static final BuiltInFunctionDefinition CURRENT_DATABASE =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("currentDatabase")
+                    .kind(SCALAR)
+                    .outputTypeStrategy(explicit(STRING().notNull()))
+                    .build();
+
     // --------------------------------------------------------------------------------------------
     // Time functions
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
index 770a70227ca..85449ea2485 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -205,6 +205,11 @@ public class DirectConvertRule implements CallExpressionConvertRule {
                 BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ,
                 FlinkSqlOperatorTable.TO_TIMESTAMP_LTZ);
 
+        // catalog functions
+        DEFINITION_OPERATOR_MAP.put(
+                BuiltInFunctionDefinitions.CURRENT_DATABASE,
+                FlinkSqlOperatorTable.CURRENT_DATABASE);
+
         // collection
         DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.AT, FlinkSqlOperatorTable.ITEM);
         DEFINITION_OPERATOR_MAP.put(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 2b18ffee5e3..75001f4960f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -53,6 +53,7 @@ import java.util.List;
 import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE;
 import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.STR_MAP_NULLABLE;
 import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.VARCHAR_FORCE_NULLABLE;
+import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.VARCHAR_NOT_NULL;
 
 /** Operator table that contains only Flink-specific functions and operators. */
 public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
@@ -1173,4 +1174,13 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
     public static final SqlFunction TUMBLE = new SqlTumbleTableFunction();
     public static final SqlFunction HOP = new SqlHopTableFunction();
     public static final SqlFunction CUMULATE = new SqlCumulateTableFunction();
+
+    // Catalog Functions
+    public static final SqlFunction CURRENT_DATABASE =
+            BuiltInSqlFunction.newBuilder()
+                    .name("CURRENT_DATABASE")
+                    .returnType(VARCHAR_NOT_NULL)
+                    .operandTypeChecker(OperandTypes.NILADIC)
+                    .notDeterministic()
+                    .build();
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java
index d1a60ff444e..ab3a61a1da3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/InternalConfigOptions.java
@@ -52,6 +52,14 @@ public final class InternalConfigOptions {
                                     + " some temporal functions like LOCAL_TIMESTAMP in batch job to make sure these"
                                     + " temporal functions has query-start semantics.");
 
+    public static final ConfigOption<String> TABLE_QUERY_CURRENT_DATABASE =
+            key("__table.query-start.current-database__")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The config used to save the current database at query start."
+                                    + " Currently, it's only used for the function CURRENT_DATABASE.");
+
     @Experimental
     public static final ConfigOption<Boolean> TABLE_EXEC_NON_TEMPORAL_SORT_ENABLED =
             key("__table.exec.sort.non-temporal.enabled__")
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index f6f54ac5254..390419c6227 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -547,6 +547,31 @@ class CodeGeneratorContext(val tableConfig: ReadableConfig, val classLoader: Cla
     fieldTerm
   }
 
+  /**
+   * Adds a reusable query-level current database to the beginning of the SAM of the generated
+   * class.
+   *
+   * <p> The current database value is evaluated once at query-start.
+   */
+  def addReusableQueryLevelCurrentDatabase(): String = {
+    val fieldTerm = s"queryCurrentDatabase"
+
+    val queryStartCurrentDatabase = tableConfig
+      .getOptional(InternalConfigOptions.TABLE_QUERY_CURRENT_DATABASE)
+      .orElseThrow(new JSupplier[Throwable] {
+        override def get() = new CodeGenException(
+          "Try to obtain current database of query-start fail." +
+            " This is a bug, please file an issue.")
+      })
+
+    reusableMemberStatements.add(s"""
+                                    |private static final $BINARY_STRING $fieldTerm =
+                                    |$BINARY_STRING.fromString("$queryStartCurrentDatabase");
+                                    |""".stripMargin)
+
+    fieldTerm
+  }
+
   /** Adds a reusable record-level local time to the beginning of the SAM of the generated class. */
   def addReusableRecordLevelLocalTime(): String = {
     val fieldTerm = s"localTime"
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index 8a052c0ac53..a589a495dee 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -21,7 +21,7 @@ import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.data.util.DataFormatConverters
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression}
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
-import org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull, generateCallIfArgsNullable, generateStringResultCallIfArgsNotNull}
+import org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull, generateCallIfArgsNullable, generateNonNullField, generateStringResultCallIfArgsNotNull}
 import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._
 import org.apache.flink.table.runtime.functions.SqlFunctionUtils
@@ -233,6 +233,10 @@ object StringCallGen {
             isCharacterString(operands(2).resultType) =>
         methodGen(BuiltInMethods.CONVERT_TZ)
 
+      case CURRENT_DATABASE =>
+        val currentDatabase = ctx.addReusableQueryLevelCurrentDatabase()
+        generateNonNullField(returnType, currentDatabase)
+
       case _ => null
     }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 401e6bf32da..c5cc1525a4d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -50,7 +50,7 @@ import org.apache.flink.table.planner.plan.reuse.SubplanReuser
 import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
 import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateTableSink}
-import org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_START_EPOCH_TIME, TABLE_QUERY_START_LOCAL_TIME}
+import org.apache.flink.table.planner.utils.InternalConfigOptions.{TABLE_QUERY_CURRENT_DATABASE, TABLE_QUERY_START_EPOCH_TIME, TABLE_QUERY_START_LOCAL_TIME}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toScala}
 import org.apache.flink.table.planner.utils.TableConfigUtils
 import org.apache.flink.table.runtime.generated.CompileUtils
@@ -450,6 +450,9 @@ abstract class PlannerBase(
       TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig)).getOffset(epochTime)
     tableConfig.set(TABLE_QUERY_START_LOCAL_TIME, localTime)
 
+    val currentDatabase = catalogManager.getCurrentDatabase
+    tableConfig.set(TABLE_QUERY_CURRENT_DATABASE, currentDatabase)
+
     // We pass only the configuration to avoid reconfiguration with the rootConfiguration
     getExecEnv.configure(tableConfig.getConfiguration, Thread.currentThread().getContextClassLoader)
 
@@ -466,6 +469,7 @@ abstract class PlannerBase(
     val configuration = tableConfig.getConfiguration
     configuration.removeConfig(TABLE_QUERY_START_EPOCH_TIME)
     configuration.removeConfig(TABLE_QUERY_START_LOCAL_TIME)
+    configuration.removeConfig(TABLE_QUERY_CURRENT_DATABASE)
 
     // Clean caches that might have filled up during optimization
     CompileUtils.cleanUp()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index dd5a3ca4dbe..d895c028f58 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -28,6 +28,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api.{DataTypes, TableSchema, ValidationException}
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour
+import org.apache.flink.table.catalog.CatalogDatabaseImpl
 import org.apache.flink.table.data.{DecimalDataUtils, TimestampData}
 import org.apache.flink.table.data.util.DataFormatConverters.LocalDateConverter
 import org.apache.flink.table.planner.expressions.utils.{RichFunc1, RichFunc2, RichFunc3, SplitUDF}
@@ -2098,4 +2099,19 @@ class CalcITCase extends BatchTestBase {
       Seq(row(1, 1, 2, 1, 3, 4, 1, 1, 2, 1, 3, 4, 1.0, 1.0, 2.0, 2.0, 2.0, null))
     )
   }
+
+  @Test
+  def testCurrentDatabase(): Unit = {
+    checkResult("SELECT CURRENT_DATABASE()", Seq(row(tEnv.getCurrentDatabase)))
+    // switch to another database
+    tEnv
+      .getCatalog(tEnv.getCurrentCatalog)
+      .get()
+      .createDatabase(
+        "db1",
+        new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"),
+        false)
+    tEnv.useDatabase("db1")
+    checkResult("SELECT CURRENT_DATABASE()", Seq(row(tEnv.getCurrentDatabase)))
+  }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
index d0aa026c41c..83b741867b0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.batch.table
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.DataTypes._
+import org.apache.flink.table.catalog.CatalogDatabaseImpl
 import org.apache.flink.table.data.DecimalDataUtils
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.expressions.utils._
@@ -640,6 +641,31 @@ class CalcITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testCurrentDatabase(): Unit = {
+    val result1 = executeQuery(
+      tEnv
+        .from("Table3")
+        .limit(1)
+        .select(currentDatabase()))
+    TestBaseUtils.compareResultAsText(result1.asJava, "default_database")
+
+    // switch to another database
+    tEnv
+      .getCatalog(tEnv.getCurrentCatalog)
+      .get()
+      .createDatabase(
+        "db1",
+        new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"),
+        false)
+    tEnv.useDatabase("db1")
+    val result2 = executeQuery(
+      tEnv
+        .from("default_database.Table3")
+        .limit(1)
+        .select(currentDatabase()))
+    TestBaseUtils.compareResultAsText(result1.asJava, "default_database")
+  }
 }
 
 @SerialVersionUID(1L)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 3126c4520de..1381851eb40 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.config.ExecutionConfigOptions.LegacyCastBehaviour
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
+import org.apache.flink.table.catalog.CatalogDatabaseImpl
 import org.apache.flink.table.data.{GenericRowData, MapData, RowData}
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils._
@@ -696,4 +697,21 @@ class CalcITCase extends StreamingTestBase {
     TestBaseUtils.compareResultAsText(result, "1,1,2,1,3,4,1,1,2,1,3,4,1.0,1.0,2.0,2.0,2.0,null")
   }
 
+  @Test
+  def testCurrentDatabase(): Unit = {
+    val result1 = tEnv.sqlQuery("SELECT CURRENT_DATABASE()").execute().collect().toList
+    assertEquals(Seq(row(tEnv.getCurrentDatabase)), result1)
+
+    // switch to another database
+    tEnv
+      .getCatalog(tEnv.getCurrentCatalog)
+      .get()
+      .createDatabase(
+        "db1",
+        new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"),
+        false)
+    tEnv.useDatabase("db1")
+    val result2 = tEnv.sqlQuery("SELECT CURRENT_DATABASE()").execute().collect().toList
+    assertEquals(Seq(row(tEnv.getCurrentDatabase)), result2)
+  }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
index 0ddc5b39372..4d1141e36f2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
@@ -21,6 +21,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.annotation.{DataTypeHint, InputGroup}
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.catalog.CatalogDatabaseImpl
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.expressions.utils._
 import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink, TestingRetractSink, UserDefinedFunctionTestUtils}
@@ -657,6 +658,39 @@ class CalcITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     val expected = List("0,0,0", "1,1,1", "2,2,2")
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testCurrentDatabase(): Unit = {
+    val result1 =
+      env
+        .fromCollection(tupleData3)
+        .toTable(tEnv)
+        .limit(1)
+        .select(currentDatabase())
+    val sink1 = new TestingAppendSink
+    result1.toAppendStream[Row].addSink(sink1)
+    env.execute()
+    assertEquals(List(tEnv.getCurrentDatabase), sink1.getAppendResults.sorted)
+
+    // switch to another database
+    tEnv
+      .getCatalog(tEnv.getCurrentCatalog)
+      .get()
+      .createDatabase(
+        "db1",
+        new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1"),
+        false)
+    tEnv.useDatabase("db1")
+    val result2 = env
+      .fromCollection(tupleData3)
+      .toTable(tEnv)
+      .limit(1)
+      .select(currentDatabase())
+    val sink2 = new TestingAppendSink
+    result2.toAppendStream[Row].addSink(sink2)
+    env.execute()
+    assertEquals(List(tEnv.getCurrentDatabase), sink2.getAppendResults.sorted)
+  }
 }
 
 @SerialVersionUID(1L)