You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/09/29 05:13:33 UTC
[flink] branch master updated: [FLINK-29348][table] DynamicFilteringEvent can be stored in the CoordinatorStore if it's received before the listener source coordinator is started.
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7bae0ebb637 [FLINK-29348][table] DynamicFilteringEvent can be stored in the CoordinatorStore if it's received before the listener source coordinator is started.
7bae0ebb637 is described below
commit 7bae0ebb6379c175522bd903838bb3737fc6c65d
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Sep 27 15:49:29 2022 +0800
[FLINK-29348][table] DynamicFilteringEvent can be stored in the CoordinatorStore if it's received before the listener source coordinator is started.
This closes #20906.
---
.../flink/connectors/hive/HiveDialectITCase.java | 135 ------------
.../hive/HiveDynamicPartitionPruningITCase.java | 234 +++++++++++++++++++++
.../flink/table/catalog/hive/HiveTestUtils.java | 19 ++
.../operators/coordination/CoordinatorStore.java | 4 +
.../coordination/CoordinatorStoreImpl.java | 5 +
.../source/coordinator/SourceCoordinator.java | 37 +++-
...cFilteringDataCollectorOperatorCoordinator.java | 58 +++--
7 files changed, 334 insertions(+), 158 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 81487871c78..32548dd731a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.connectors.hive;
-import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.SqlDialect;
@@ -26,7 +25,6 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -37,7 +35,6 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.functions.hive.HiveGenericUDTFTest;
@@ -1287,138 +1284,6 @@ public class HiveDialectITCase {
}
}
- @Test
- public void testDynamicPartitionPruning() throws Exception {
- // src table
- tableEnv.executeSql("create table dim (x int,y string,z int)");
- tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await();
-
- // partitioned dest table
- tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)");
- tableEnv.executeSql(
- "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
- .await();
- tableEnv.executeSql(
- "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
- .await();
- tableEnv.executeSql(
- "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
- .await();
-
- System.out.println(
- tableEnv.explainSql(
- "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a"));
-
- tableEnv.getConfig().set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
- tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
- tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
-
- String sql = "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a";
- String sqlSwapFactDim =
- "select a, b, c, p, x, y from dim, fact where x = p and z = 1 order by a";
-
- String expected =
- "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
- + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]";
-
- // Check dynamic partition pruning is working
- String plan = tableEnv.explainSql(sql);
- assertThat(plan).contains("DynamicFilteringDataCollector");
-
- plan = tableEnv.explainSql(sqlSwapFactDim);
- assertThat(plan).contains("DynamicFilteringDataCollector");
-
- // Validate results
- List<Row> results = queryResult(tableEnv.sqlQuery(sql));
- assertThat(results.toString()).isEqualTo(expected);
-
- results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
- assertThat(results.toString()).isEqualTo(expected);
-
- // Validate results with table statistics
- tableEnv.getCatalog(tableEnv.getCurrentCatalog())
- .get()
- .alterTableStatistics(
- new ObjectPath(tableEnv.getCurrentDatabase(), "dim"),
- new CatalogTableStatistics(3, -1, -1, -1),
- false);
-
- results = queryResult(tableEnv.sqlQuery(sql));
- assertThat(results.toString()).isEqualTo(expected);
-
- results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
- assertThat(results.toString()).isEqualTo(expected);
- }
-
- @Test
- public void testDynamicPartitionPruningOnTwoFactTables() throws Exception {
- tableEnv.executeSql("create table dim (x int,y string,z int)");
- tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await();
-
- // partitioned dest table
- tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)");
- tableEnv.executeSql(
- "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
- .await();
- tableEnv.executeSql(
- "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
- .await();
- tableEnv.executeSql(
- "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
- .await();
-
- // partitioned dest table
- tableEnv.executeSql(
- "create table fact2 (a int, b bigint, c string) partitioned by (p int)");
- tableEnv.executeSql(
- "insert into fact2 partition (p=1) values (40,100,'aaa'),(41,101,'bbb'),(42,102,'ccc') ")
- .await();
- tableEnv.executeSql(
- "insert into fact2 partition (p=2) values (50,200,'aaa'),(51,201,'bbb'),(52,202,'ccc') ")
- .await();
- tableEnv.executeSql(
- "insert into fact2 partition (p=3) values (60,300,'aaa'),(61,301,'bbb'),(62,302,'ccc') ")
- .await();
-
- tableEnv.getConfig().set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
-
- tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
- tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
-
- // two fact sources share the same dynamic filter
- String sql =
- "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) "
- + "union all "
- + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 1)) t order by a";
- String expected =
- "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
- + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], "
- + "+I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1, a], +I[42, 102, ccc, 1, 1, a], "
- + "+I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2, b], +I[52, 202, ccc, 2, 2, b]]";
-
- String plan = tableEnv.explainSql(sql);
- assertThat(plan).containsOnlyOnce("DynamicFilteringDataCollector(fields=[x])(reuse_id=");
-
- List<Row> results = queryResult(tableEnv.sqlQuery(sql));
- assertThat(results.toString()).isEqualTo(expected);
-
- // two fact sources use different dynamic filters
- String sql2 =
- "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) "
- + "union all "
- + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 2)) t order by a";
- String expected2 =
- "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
- + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], "
- + "+I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3, c], +I[62, 302, ccc, 3, 3, c]]";
-
- plan = tableEnv.explainSql(sql2);
- assertThat(plan).contains("DynamicFilteringDataCollector");
-
- results = queryResult(tableEnv.sqlQuery(sql2));
- assertThat(results.toString()).isEqualTo(expected2);
- }
-
private void verifyUnsupportedOperation(String ddl) {
assertThatThrownBy(() -> tableEnv.executeSql(ddl))
.isInstanceOf(ValidationException.class)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java
new file mode 100644
index 00000000000..dce66345c24
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicPartitionPruningITCase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests the dynamic partition pruning optimization on Hive sources. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class HiveDynamicPartitionPruningITCase {
+
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(4)
+ .build());
+
+ @Parameter public boolean enableAdaptiveBatchScheduler;
+
+ @Parameters(name = "enableAdaptiveBatchScheduler={0}")
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(false, true);
+ }
+
+ private TableEnvironment tableEnv;
+ private HiveCatalog hiveCatalog;
+ private String warehouse;
+
+ @BeforeEach
+ public void setup() {
+ hiveCatalog = HiveTestUtils.createHiveCatalog();
+ hiveCatalog
+ .getHiveConf()
+ .setBoolVar(
+ HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES, false);
+ hiveCatalog.open();
+ warehouse = hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+
+ if (enableAdaptiveBatchScheduler) {
+ tableEnv = HiveTestUtils.createTableEnvInBatchModeWithAdaptiveScheduler();
+ } else {
+ tableEnv = HiveTestUtils.createTableEnvInBatchMode();
+ }
+
+ tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ tableEnv.useCatalog(hiveCatalog.getName());
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (hiveCatalog != null) {
+ hiveCatalog.close();
+ }
+ if (warehouse != null) {
+ FileUtils.deleteDirectoryQuietly(new File(warehouse));
+ }
+ }
+
+ @TestTemplate
+ public void testDynamicPartitionPruning() throws Exception {
+ // src table
+ tableEnv.executeSql("create table dim (x int,y string,z int)");
+ tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await();
+
+ // partitioned dest table
+ tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)");
+ tableEnv.executeSql(
+ "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
+ .await();
+
+ System.out.println(
+ tableEnv.explainSql(
+ "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a"));
+
+ tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
+
+ String sql = "select a, b, c, p, x, y from fact, dim where x = p and z = 1 order by a";
+ String sqlSwapFactDim =
+ "select a, b, c, p, x, y from dim, fact where x = p and z = 1 order by a";
+
+ String expected =
+ "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
+ + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]";
+
+ // Check dynamic partition pruning is working
+ String plan = tableEnv.explainSql(sql);
+ assertThat(plan).contains("DynamicFilteringDataCollector");
+
+ plan = tableEnv.explainSql(sqlSwapFactDim);
+ assertThat(plan).contains("DynamicFilteringDataCollector");
+
+ // Validate results
+ List<Row> results = queryResult(tableEnv.sqlQuery(sql));
+ assertThat(results.toString()).isEqualTo(expected);
+
+ results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
+ assertThat(results.toString()).isEqualTo(expected);
+
+ // Validate results with table statistics
+ tableEnv.getCatalog(tableEnv.getCurrentCatalog())
+ .get()
+ .alterTableStatistics(
+ new ObjectPath(tableEnv.getCurrentDatabase(), "dim"),
+ new CatalogTableStatistics(3, -1, -1, -1),
+ false);
+
+ results = queryResult(tableEnv.sqlQuery(sql));
+ assertThat(results.toString()).isEqualTo(expected);
+
+ results = queryResult(tableEnv.sqlQuery(sqlSwapFactDim));
+ assertThat(results.toString()).isEqualTo(expected);
+ }
+
+ @TestTemplate
+ public void testDynamicPartitionPruningOnTwoFactTables() throws Exception {
+ tableEnv.executeSql("create table dim (x int,y string,z int)");
+ tableEnv.executeSql("insert into dim values (1,'a',1),(2,'b',1),(3,'c',2)").await();
+
+ // partitioned dest table
+ tableEnv.executeSql("create table fact (a int, b bigint, c string) partitioned by (p int)");
+ tableEnv.executeSql(
+ "insert into fact partition (p=1) values (10,100,'aaa'),(11,101,'bbb'),(12,102,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact partition (p=2) values (20,200,'aaa'),(21,201,'bbb'),(22,202,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact partition (p=3) values (30,300,'aaa'),(31,301,'bbb'),(32,302,'ccc') ")
+ .await();
+
+ // partitioned dest table
+ tableEnv.executeSql(
+ "create table fact2 (a int, b bigint, c string) partitioned by (p int)");
+ tableEnv.executeSql(
+ "insert into fact2 partition (p=1) values (40,100,'aaa'),(41,101,'bbb'),(42,102,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact2 partition (p=2) values (50,200,'aaa'),(51,201,'bbb'),(52,202,'ccc') ")
+ .await();
+ tableEnv.executeSql(
+ "insert into fact2 partition (p=3) values (60,300,'aaa'),(61,301,'bbb'),(62,302,'ccc') ")
+ .await();
+
+ tableEnv.getConfig().set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
+
+ // two fact sources share the same dynamic filter
+ String sql =
+ "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) "
+ + "union all "
+ + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 1)) t order by a";
+ String expected =
+ "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
+ + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], "
+ + "+I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1, a], +I[42, 102, ccc, 1, 1, a], "
+ + "+I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2, b], +I[52, 202, ccc, 2, 2, b]]";
+
+ String plan = tableEnv.explainSql(sql);
+ assertThat(plan).containsOnlyOnce("DynamicFilteringDataCollector(fields=[x])(reuse_id=");
+
+ List<Row> results = queryResult(tableEnv.sqlQuery(sql));
+ assertThat(results.toString()).isEqualTo(expected);
+
+ // two fact sources use different dynamic filters
+ String sql2 =
+ "select * from ((select a, b, c, p, x, y from fact, dim where x = p and z = 1) "
+ + "union all "
+ + "(select a, b, c, p, x, y from fact2, dim where x = p and z = 2)) t order by a";
+ String expected2 =
+ "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], "
+ + "+I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], "
+ + "+I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3, c], +I[62, 302, ccc, 3, 3, c]]";
+
+ plan = tableEnv.explainSql(sql2);
+ assertThat(plan).contains("DynamicFilteringDataCollector");
+
+ results = queryResult(tableEnv.sqlQuery(sql2));
+ assertThat(results.toString()).isEqualTo(expected2);
+ }
+
+ private static List<Row> queryResult(org.apache.flink.table.api.Table table) {
+ return CollectionUtil.iteratorToList(table.execute().collect());
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 65bd028c349..266b4097799 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -18,6 +18,9 @@
package org.apache.flink.table.catalog.hive;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.sql.parser.SqlPartitionUtils;
import org.apache.flink.sql.parser.hive.ddl.SqlAddHivePartitions;
import org.apache.flink.sql.parser.hive.impl.FlinkHiveSqlParserImpl;
@@ -158,6 +161,22 @@ public class HiveTestUtils {
return tableEnv;
}
+ public static TableEnvironment createTableEnvInBatchModeWithAdaptiveScheduler() {
+ EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
+ settings.getConfiguration()
+ .set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.AdaptiveBatch);
+ settings.getConfiguration()
+ .set(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 4);
+ settings.getConfiguration()
+ .set(
+ JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK,
+ MemorySize.parse("150kb"));
+ settings.getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, -1);
+ TableEnvironment tableEnv = TableEnvironment.create(settings);
+ tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ return tableEnv;
+ }
+
public static StreamTableEnvironment createTableEnvInStreamingMode(
StreamExecutionEnvironment env) {
return createTableEnvInStreamingMode(env, SqlDialect.DEFAULT);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java
index ce0a5192602..90552e0adc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStore.java
@@ -29,6 +29,8 @@ import java.util.function.Function;
* {@link CoordinatorStore} can be used for sharing some information among {@link
* OperatorCoordinator} instances. Motivating example is/was combining/aggregating latest watermark
* emitted by different sources in order to do the watermark alignment.
+ *
+ * <p>Implementations of this interface must ensure that all operations are atomic.
*/
@ThreadSafe
@Internal
@@ -41,5 +43,7 @@ public interface CoordinatorStore {
Object computeIfPresent(Object key, BiFunction<Object, Object, Object> remappingFunction);
+ Object compute(Object key, BiFunction<Object, Object, Object> mappingFunction);
+
<R> R apply(Object key, Function<Object, R> consumer);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java
index 72daf833e1a..6956d94006c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CoordinatorStoreImpl.java
@@ -54,6 +54,11 @@ public class CoordinatorStoreImpl implements CoordinatorStore {
return store.computeIfPresent(key, remappingFunction);
}
+ @Override
+ public Object compute(Object key, BiFunction<Object, Object, Object> mappingFunction) {
+ return store.compute(key, mappingFunction);
+ }
+
@Override
public <R> R apply(Object key, Function<Object, R> consumer) {
return consumer.apply(store.get(key));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index c4083f9d173..842b768050d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -225,13 +225,36 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
runInEventLoop(() -> enumerator.start(), "starting the SplitEnumerator.");
if (coordinatorListeningID != null) {
- if (coordinatorStore.containsKey(coordinatorListeningID)) {
- // The coordinator will be recreated after global failover. It should be registered
- // again replacing the previous one.
- coordinatorStore.computeIfPresent(coordinatorListeningID, (id, origin) -> this);
- } else {
- coordinatorStore.putIfAbsent(coordinatorListeningID, this);
- }
+ coordinatorStore.compute(
+ coordinatorListeningID,
+ (key, oldValue) -> {
+ // The value for a listener ID can be a source coordinator listening to an
+ // event, or an event waiting to be retrieved
+ if (oldValue == null || oldValue instanceof OperatorCoordinator) {
+ // The coordinator has not registered or needs to be recreated after
+ // global failover.
+ return this;
+ } else {
+ checkState(
+ oldValue instanceof OperatorEvent,
+ "The existing value for "
+ + coordinatorStore
+ + "is expected to be an operator event, but it is in fact "
+ + oldValue);
+ LOG.info(
+ "Handling event {} received before the source coordinator with ID {} is registered",
+ oldValue,
+ coordinatorListeningID);
+ handleEventFromOperator(0, 0, (OperatorEvent) oldValue);
+
+ // Since for non-global failover the coordinator will not be recreated
+ // and for global failover both the sender and receiver need to restart,
+ // the coordinator will receive the event only once.
+ // As the event has been processed, it can be removed safely and there's
+ // no need to register the coordinator for further events as well.
+ return null;
+ }
+ });
}
}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
index 50b10768765..bc0b6219e37 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCo
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
@@ -68,8 +70,7 @@ public class DynamicFilteringDataCollectorOperatorCoordinator
public void close() throws Exception {}
@Override
- public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
- throws Exception {
+ public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
DynamicFilteringData currentData =
((DynamicFilteringEvent) ((SourceEventWrapper) event).getSourceEvent()).getData();
if (receivedFilteringData == null) {
@@ -92,20 +93,45 @@ public class DynamicFilteringDataCollectorOperatorCoordinator
}
for (String listenerID : dynamicFilteringDataListenerIDs) {
- // Push event to listening source coordinators.
- OperatorCoordinator listener = (OperatorCoordinator) coordinatorStore.get(listenerID);
- if (listener == null) {
- throw new IllegalStateException(
- "Dynamic filtering data listener is missing: " + listenerID);
- } else {
- LOG.info(
- "Distributing event {} to source coordinator with ID {}",
- event,
- listenerID);
- // Subtask index and attempt number is not necessary for handling
- // DynamicFilteringEvent.
- listener.handleEventFromOperator(0, 0, event);
- }
+ coordinatorStore.compute(
+ listenerID,
+ (key, oldValue) -> {
+ // The value for a listener ID can be a source coordinator listening to an
+ // event, or an event waiting to be retrieved
+ if (oldValue == null || oldValue instanceof OperatorEvent) {
+ // If the listener has not been registered, or after a global failover
+ // without cleanup the store, we simply update it to the latest value.
+ // The listener coordinator would retrieve the event once it's started.
+ LOG.info(
+ "Updating event {} before the source coordinator with ID {} is registered",
+ event,
+ listenerID);
+ return event;
+ } else {
+ checkState(
+ oldValue instanceof OperatorCoordinator,
+ "The existing value for "
+ + listenerID
+ + "is expected to be an operator coordinator, but it is in fact "
+ + oldValue);
+ LOG.info(
+ "Distributing event {} to source coordinator with ID {}",
+ event,
+ listenerID);
+ try {
+ // Subtask index and attempt number is not necessary for handling
+ // DynamicFilteringEvent.
+ ((OperatorCoordinator) oldValue)
+ .handleEventFromOperator(0, 0, event);
+ } catch (Exception e) {
+ ExceptionUtils.rethrow(e);
+ }
+
+ // Dynamic filtering event is expected to be sent only once. So after
+ // the coordinator is notified, it can be removed from the store.
+ return null;
+ }
+ });
}
}