You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/09/29 05:13:33 UTC

[flink] branch master updated: [FLINK-29348][table] DynamicFilteringEvent can be stored in the CoordinatorStore if it's received before the listener source coordinator is started.

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bae0ebb637 [FLINK-29348][table] DynamicFilteringEvent can be stored in the CoordinatorStore if it's received before the listener source coordinator is started.
7bae0ebb637 is described below

commit 7bae0ebb6379c175522bd903838bb3737fc6c65d
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Sep 27 15:49:29 2022 +0800

    [FLINK-29348][table] DynamicFilteringEvent can be stored in the CoordinatorStore if it's received before the listener source coordinator is started.
    
    This closes #20906.
---
 .../flink/connectors/hive/HiveDialectITCase.java   | 135 ------------
 .../hive/HiveDynamicPartitionPruningITCase.java    | 234 +++++++++++++++++++++
 .../flink/table/catalog/hive/HiveTestUtils.java    |  19 ++
 .../operators/coordination/CoordinatorStore.java   |   4 +
 .../coordination/CoordinatorStoreImpl.java         |   5 +
 .../source/coordinator/SourceCoordinator.java      |  37 +++-
 ...cFilteringDataCollectorOperatorCoordinator.java |  58 +++--
 7 files changed, 334 insertions(+), 158 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 81487871c78..32548dd731a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.connectors.hive;
 
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable;
 import org.apache.flink.table.HiveVersionTestUtil;
 import org.apache.flink.table.api.SqlDialect;
@@ -26,7 +25,6 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -37,7 +35,6 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.delegation.ExtendedOperationExecutor;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.functions.hive.HiveGenericUDTFTest;
@@ -1287,138 +1284,6 @@ public class HiveDialectITCase {
         }
     }
 
-    @Test
-    public void testDynamicPartitionPruning() throws Exception {
-        // src table
-        tableEnv.executeSql("create table dim (x int,y string,z int)");
-        tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await();
-
-        // partitioned dest table
-        tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)");
-        tableEnv.executeSql(
-                        "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
-                .await();
-        tableEnv.executeSql(
-                        "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
-                .await();
-        tableEnv.executeSql(
-                        "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
-                .await();
-
-        System.out.println(
-                tableEnv.explainSql(
-                        "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a"));
-
-        tableEnv.getConfig().set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
-        tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
-        tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
-
-        String sql = "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a";
-        String sqlSwapFactDim =
-                "select a, b, c, p, x, y from dim, fact where x = p and z = 1 order by a";
-
-        String expected =
-                "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
-                        + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]";
-
-        // Check dynamic partition pruning is working
-        String plan = tableEnv.explainSql(sql);
-        assertThat(plan).contains("DynamicFilteringDataCollector");
-
-        plan = tableEnv.explainSql(sqlSwapFactDim);
-        assertThat(plan).contains("DynamicFilteringDataCollector");
-
-        // Validate results
-        List<Row> results = queryResult(tableEnv.sqlQuery(sql));
-        assertThat(results.toString()).isEqualTo(expected);
-
-        results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
-        assertThat(results.toString()).isEqualTo(expected);
-
-        // Validate results with table statistics
-        tableEnv.getCatalog(tableEnv.getCurrentCatalog())
-                .get()
-                .alterTableStatistics(
-                        new ObjectPath(tableEnv.getCurrentDatabase(), "dim"),
-                        new CatalogTableStatistics(3, -1, -1, -1),
-                        false);
-
-        results = queryResult(tableEnv.sqlQuery(sql));
-        assertThat(results.toString()).isEqualTo(expected);
-
-        results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
-        assertThat(results.toString()).isEqualTo(expected);
-    }
-
-    @Test
-    public void testDynamicPartitionPruningOnTwoFactTables() throws Exception {
-        tableEnv.executeSql("create table dim (x int,y string,z int)");
-        tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await();
-
-        // partitioned dest table
-        tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)");
-        tableEnv.executeSql(
-                        "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
-                .await();
-        tableEnv.executeSql(
-                        "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
-                .await();
-        tableEnv.executeSql(
-                        "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
-                .await();
-
-        // partitioned dest table
-        tableEnv.executeSql(
-                "create table fact2 (a int, b bigint, c string) partitioned by (p int)");
-        tableEnv.executeSql(
-                        "insert into fact2 partition (p=1) values (40,100,'aaa'),(41,101,'bbb'),(42,102,'ccc') ")
-                .await();
-        tableEnv.executeSql(
-                        "insert into fact2 partition (p=2) values (50,200,'aaa'),(51,201,'bbb'),(52,202,'ccc') ")
-                .await();
-        tableEnv.executeSql(
-                        "insert into fact2 partition (p=3) values (60,300,'aaa'),(61,301,'bbb'),(62,302,'ccc') ")
-                .await();
-
-        tableEnv.getConfig().set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
-
-        tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
-        tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
-
-        // two fact sources share the same dynamic filter
-        String sql =
-                "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) "
-                        + "union all "
-                        + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 1)) t order by a";
-        String expected =
-                "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
-                        + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], "
-                        + "+I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1, a], +I[42, 102, ccc, 1, 1, a], "
-                        + "+I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2, b], +I[52, 202, ccc, 2, 2, b]]";
-
-        String plan = tableEnv.explainSql(sql);
-        assertThat(plan).containsOnlyOnce("DynamicFilteringDataCollector(fields=[x])(reuse_id=");
-
-        List<Row> results = queryResult(tableEnv.sqlQuery(sql));
-        assertThat(results.toString()).isEqualTo(expected);
-
-        // two fact sources use different dynamic filters
-        String sql2 =
-                "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) "
-                        + "union all "
-                        + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 2)) t order by a";
-        String expected2 =
-                "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
-                        + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], "
-                        + "+I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3, c], +I[62, 302, ccc, 3, 3, c]]";
-
-        plan = tableEnv.explainSql(sql2);
-        assertThat(plan).contains("DynamicFilteringDataCollector");
-
-        results = queryResult(tableEnv.sqlQuery(sql2));
-        assertThat(results.toString()).isEqualTo(expected2);
-    }
-
     private void verifyUnsupportedOperation(String ddl) {
         assertThatThrownBy(() -> tableEnv.executeSql(ddl))
                 .isInstanceOf(ValidationException.class)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java
new file mode 100644
index 00000000000..dce66345c24
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests the dynamic partition pruning optimization on Hive sources. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class HiveDynamicPartitionPruningITCase {
+
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(4)
+                            .build());
+
+    @Parameter public boolean enableAdaptiveBatchScheduler;
+
+    @Parameters(name = "enableAdaptiveBatchScheduler={0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
+    }
+
+    private TableEnvironment tableEnv;
+    private HiveCatalog hiveCatalog;
+    private String warehouse;
+
+    @BeforeEach
+    public void setup() {
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        hiveCatalog
+                .getHiveConf()
+                .setBoolVar(
+                        HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES, false);
+        hiveCatalog.open();
+        warehouse = hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+
+        if (enableAdaptiveBatchScheduler) {
+            tableEnv = HiveTestUtils.createTableEnvInBatchModeWithAdaptiveScheduler();
+        } else {
+            tableEnv = HiveTestUtils.createTableEnvInBatchMode();
+        }
+
+        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tableEnv.useCatalog(hiveCatalog.getName());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (hiveCatalog != null) {
+            hiveCatalog.close();
+        }
+        if (warehouse != null) {
+            FileUtils.deleteDirectoryQuietly(new File(warehouse));
+        }
+    }
+
+    @TestTemplate
+    public void testDynamicPartitionPruning() throws Exception {
+        // src table
+        tableEnv.executeSql("create table dim (x int,y string,z int)");
+        tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await();
+
+        // partitioned dest table
+        tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)");
+        tableEnv.executeSql(
+                        "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
+                .await();
+
+        System.out.println(
+                tableEnv.explainSql(
+                        "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a"));
+
+        tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
+
+        String sql = "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a";
+        String sqlSwapFactDim =
+                "select a, b, c, p, x, y from dim, fact where x = p and z = 1 order by a";
+
+        String expected =
+                "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
+                        + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]";
+
+        // Check dynamic partition pruning is working
+        String plan = tableEnv.explainSql(sql);
+        assertThat(plan).contains("DynamicFilteringDataCollector");
+
+        plan = tableEnv.explainSql(sqlSwapFactDim);
+        assertThat(plan).contains("DynamicFilteringDataCollector");
+
+        // Validate results
+        List<Row> results = queryResult(tableEnv.sqlQuery(sql));
+        assertThat(results.toString()).isEqualTo(expected);
+
+        results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
+        assertThat(results.toString()).isEqualTo(expected);
+
+        // Validate results with table statistics
+        tableEnv.getCatalog(tableEnv.getCurrentCatalog())
+                .get()
+                .alterTableStatistics(
+                        new ObjectPath(tableEnv.getCurrentDatabase(), "dim"),
+                        new CatalogTableStatistics(3, -1, -1, -1),
+                        false);
+
+        results = queryResult(tableEnv.sqlQuery(sql));
+        assertThat(results.toString()).isEqualTo(expected);
+
+        results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
+        assertThat(results.toString()).isEqualTo(expected);
+    }
+
+    @TestTemplate
+    public void testDynamicPartitionPruningOnTwoFactTables() throws Exception {
+        tableEnv.executeSql("create table dim (x int,y string,z int)");
+        tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await();
+
+        // partitioned dest table
+        tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)");
+        tableEnv.executeSql(
+                        "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
+                .await();
+
+        // partitioned dest table
+        tableEnv.executeSql(
+                "create table fact2 (a int, b bigint, c string) partitioned by (p int)");
+        tableEnv.executeSql(
+                        "insert into fact2 partition (p=1) values (40,100,'aaa'),(41,101,'bbb'),(42,102,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact2 partition (p=2) values (50,200,'aaa'),(51,201,'bbb'),(52,202,'ccc') ")
+                .await();
+        tableEnv.executeSql(
+                        "insert into fact2 partition (p=3) values (60,300,'aaa'),(61,301,'bbb'),(62,302,'ccc') ")
+                .await();
+
+        tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
+
+        // two fact sources share the same dynamic filter
+        String sql =
+                "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) "
+                        + "union all "
+                        + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 1)) t order by a";
+        String expected =
+                "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
+                        + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], "
+                        + "+I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1, a], +I[42, 102, ccc, 1, 1, a], "
+                        + "+I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2, b], +I[52, 202, ccc, 2, 2, b]]";
+
+        String plan = tableEnv.explainSql(sql);
+        assertThat(plan).containsOnlyOnce("DynamicFilteringDataCollector(fields=[x])(reuse_id=");
+
+        List<Row> results = queryResult(tableEnv.sqlQuery(sql));
+        assertThat(results.toString()).isEqualTo(expected);
+
+        // two fact sources use different dynamic filters
+        String sql2 =
+                "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) "
+                        + "union all "
+                        + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 2)) t order by a";
+        String expected2 =
+                "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
+                        + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], "
+                        + "+I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3, c], +I[62, 302, ccc, 3, 3, c]]";
+
+        plan = tableEnv.explainSql(sql2);
+        assertThat(plan).contains("DynamicFilteringDataCollector");
+
+        results = queryResult(tableEnv.sqlQuery(sql2));
+        assertThat(results.toString()).isEqualTo(expected2);
+    }
+
+    private static List<Row> queryResult(org.apache.flink.table.api.Table table) {
+        return CollectionUtil.iteratorToList(table.execute().collect());
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 65bd028c349..266b4097799 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.sql.parser.SqlPartitionUtils;
 import org.apache.flink.sql.parser.hive.ddl.SqlAddHivePartitions;
 import org.apache.flink.sql.parser.hive.impl.FlinkHiveSqlParserImpl;
@@ -158,6 +161,22 @@ public class HiveTestUtils {
         return tableEnv;
     }
 
+    public static TableEnvironment createTableEnvInBatchModeWithAdaptiveScheduler() {
+        EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
+        settings.getConfiguration()
+                .set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.AdaptiveBatch);
+        settings.getConfiguration()
+                .set(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 4);
+        settings.getConfiguration()
+                .set(
+                        JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK,
+                        MemorySize.parse("150kb"));
+        settings.getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, -1);
+        TableEnvironment tableEnv = TableEnvironment.create(settings);
+        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        return tableEnv;
+    }
+
     public static StreamTableEnvironment createTableEnvInStreamingMode(
             StreamExecutionEnvironment env) {
         return createTableEnvInStreamingMode(env, SqlDialect.DEFAULT);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java
index ce0a5192602..90552e0adc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java
@@ -29,6 +29,8 @@ import java.util.function.Function;
  * {@link CoordinatorStore} can be used for sharing some information among {@link
  * OperatorCoordinator} instances. Motivating example is/was combining/aggregating latest watermark
  * emitted by different sources in order to do the watermark alignment.
+ *
+ * <p>Implementations of this interface must ensure that all operations are atomic.
  */
 @ThreadSafe
 @Internal
@@ -41,5 +43,7 @@ public interface CoordinatorStore {
 
     Object computeIfPresent(Object key, BiFunction<Object, Object, Object> remappingFunction);
 
+    Object compute(Object key, BiFunction<Object, Object, Object> mappingFunction);
+
     <R> R apply(Object key, Function<Object, R> consumer);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java
index 72daf833e1a..6956d94006c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java
@@ -54,6 +54,11 @@ public class CoordinatorStoreImpl implements CoordinatorStore {
         return store.computeIfPresent(key, remappingFunction);
     }
 
+    @Override
+    public Object compute(Object key, BiFunction<Object, Object, Object> mappingFunction) {
+        return store.compute(key, mappingFunction);
+    }
+
     @Override
     public <R> R apply(Object key, Function<Object, R> consumer) {
         return consumer.apply(store.get(key));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index c4083f9d173..842b768050d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -225,13 +225,36 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
         runInEventLoop(() -> enumerator.start(), "starting the SplitEnumerator.");
 
         if (coordinatorListeningID != null) {
-            if (coordinatorStore.containsKey(coordinatorListeningID)) {
-                // The coordinator will be recreated after global failover. It should be registered
-                // again replacing the previous one.
-                coordinatorStore.computeIfPresent(coordinatorListeningID, (id, origin) -> this);
-            } else {
-                coordinatorStore.putIfAbsent(coordinatorListeningID, this);
-            }
+            coordinatorStore.compute(
+                    coordinatorListeningID,
+                    (key, oldValue) -> {
+                        // The value for a listener ID can be a source coordinator listening to an
+                        // event, or an event waiting to be retrieved
+                        if (oldValue == null || oldValue instanceof OperatorCoordinator) {
+                            // The coordinator has not registered or needs to be recreated after
+                            // global failover.
+                            return this;
+                        } else {
+                            checkState(
+                                    oldValue instanceof OperatorEvent,
+                                    "The existing value for "
+                                            + coordinatorStore
+                                            + "is expected to be an operator event, but it is in fact "
+                                            + oldValue);
+                            LOG.info(
+                                    "Handling event {} received before the source coordinator with ID {} is registered",
+                                    oldValue,
+                                    coordinatorListeningID);
+                            handleEventFromOperator(0, 0, (OperatorEvent) oldValue);
+
+                            // Since for non-global failover the coordinator will not be recreated
+                            // and for global failover both the sender and receiver need to restart,
+                            // the coordinator will receive the event only once.
+                            // As the event has been processed, it can be removed safely and there's
+                            // no need to register the coordinator for further events as well.
+                            return null;
+                        }
+                    });
         }
     }
 
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
index 50b10768765..bc0b6219e37 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCo
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.table.connector.source.DynamicFilteringData;
 import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
@@ -68,8 +70,7 @@ public class DynamicFilteringDataCollectorOperatorCoordinator
     public void close() throws Exception {}
 
     @Override
-    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
-            throws Exception {
+    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
         DynamicFilteringData currentData =
                 ((DynamicFilteringEvent) ((SourceEventWrapper) event).getSourceEvent()).getData();
         if (receivedFilteringData == null) {
@@ -92,20 +93,45 @@ public class DynamicFilteringDataCollectorOperatorCoordinator
         }
 
         for (String listenerID : dynamicFilteringDataListenerIDs) {
-            // Push event to listening source coordinators.
-            OperatorCoordinator listener = (OperatorCoordinator) coordinatorStore.get(listenerID);
-            if (listener == null) {
-                throw new IllegalStateException(
-                        "Dynamic filtering data listener is missing: " + listenerID);
-            } else {
-                LOG.info(
-                        "Distributing event {} to source coordinator with ID {}",
-                        event,
-                        listenerID);
-                // Subtask index and attempt number is not necessary for handling
-                // DynamicFilteringEvent.
-                listener.handleEventFromOperator(0, 0, event);
-            }
+            coordinatorStore.compute(
+                    listenerID,
+                    (key, oldValue) -> {
+                        // The value for a listener ID can be a source coordinator listening to an
+                        // event, or an event waiting to be retrieved
+                        if (oldValue == null || oldValue instanceof OperatorEvent) {
+                            // If the listener has not been registered, or after a global failover
+                            // without cleanup the store, we simply update it to the latest value.
+                            // The listener coordinator would retrieve the event once it's started.
+                            LOG.info(
+                                    "Updating event {} before the source coordinator with ID {} is registered",
+                                    event,
+                                    listenerID);
+                            return event;
+                        } else {
+                            checkState(
+                                    oldValue instanceof OperatorCoordinator,
+                                    "The existing value for "
+                                            + listenerID
+                                            + "is expected to be an operator coordinator, but it is in fact "
+                                            + oldValue);
+                            LOG.info(
+                                    "Distributing event {} to source coordinator with ID {}",
+                                    event,
+                                    listenerID);
+                            try {
+                                // Subtask index and attempt number is not necessary for handling
+                                // DynamicFilteringEvent.
+                                ((OperatorCoordinator) oldValue)
+                                        .handleEventFromOperator(0, 0, event);
+                            } catch (Exception e) {
+                                ExceptionUtils.rethrow(e);
+                            }
+
+                            // Dynamic filtering event is expected to be sent only once. So after
+                            // the coordinator is notified, it can be removed from the store.
+                            return null;
+                        }
+                    });
         }
     }