You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/07 02:33:48 UTC

[flink] branch master updated (fb95798b1c3 -> b0859789e77)

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from fb95798b1c3 [FLINK-28807] Honor schema lifecycle
     new 9bda6779562 [FLINK-28709][source] Introduce coordinatorListeningID in SourceCoordinator to listen to events from other coordinators
     new 3a8e71e286d [FLINK-28709][table] Introduce DynamicFilteringData and the DynamicFilteringDataCollectorOperator to build and distribute the data
     new b0859789e77 [FLINK-28709][table] Introduce ExecutionOrderEnforcerOperator to ensure the source with dynamic filtering is executed after the DynamicFilteringData is collected

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../source/coordinator/SourceCoordinator.java      |  23 ++-
 .../coordinator/SourceCoordinatorProvider.java     |   8 +-
 .../coordinator/SourceCoordinatorProviderTest.java |   3 +-
 .../source/coordinator/SourceCoordinatorTest.java  |  34 +++-
 .../coordinator/SourceCoordinatorTestBase.java     |   3 +-
 .../api/operators/SourceOperatorFactory.java       |  11 +-
 .../api/transformations/SourceTransformation.java  |  12 ++
 .../SourceTransformationTranslator.java            |   1 +
 .../connector/source/DynamicFilteringData.java     | 196 +++++++++++++++++++++
 .../connector/source/DynamicFilteringEvent.java    |  45 +++++
 .../DynamicFilteringDataCollectorOperator.java     | 178 +++++++++++++++++++
 ...cFilteringDataCollectorOperatorCoordinator.java | 154 ++++++++++++++++
 ...namicFilteringDataCollectorOperatorFactory.java | 100 +++++++++++
 .../ExecutionOrderEnforcerOperator.java            |  71 ++++++++
 .../ExecutionOrderEnforcerOperatorFactory.java     |  47 +++++
 ...teringDataCollectorOperatorCoordinatorTest.java | 113 ++++++++++++
 .../DynamicFilteringDataCollectorOperatorTest.java | 123 +++++++++++++
 .../dynamicfiltering/DynamicFilteringDataTest.java | 138 +++++++++++++++
 18 files changed, 1248 insertions(+), 12 deletions(-)
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java
 create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java
 create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
 create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java
 create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java
 create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperatorFactory.java
 create mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinatorTest.java
 create mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java
 create mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataTest.java


[flink] 03/03: [FLINK-28709][table] Introduce ExecutionOrderEnforcerOperator to ensure the source with dynamic filtering is executed after the DynamicFilteringData is collected

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0859789e7733c73a21e600ec0d595ead730c59d
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Thu Jul 28 18:28:25 2022 +0800

    [FLINK-28709][table] Introduce ExecutionOrderEnforcerOperator to ensure the source with dynamic filtering is executed after the DynamicFilteringData is collected
    
    This closes #20374
---
 .../ExecutionOrderEnforcerOperator.java            | 71 ++++++++++++++++++++++
 .../ExecutionOrderEnforcerOperatorFactory.java     | 47 ++++++++++++++
 2 files changed, 118 insertions(+)

diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java
new file mode 100644
index 00000000000..caa74ed3713
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.streaming.api.operators.AbstractInput;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * ExecutionOrderEnforcerOperator has two inputs, one of which is a source, and the other is the
+ * dependent upstream. It enforces that the input source is executed after the dependent input is
+ * finished. Everything passed from the inputs is forwarded to the output, though typically the
+ * dependent input should not send anything.
+ *
+ * <p>The operator must be chained with the source, which is generally ensured by the {@link
+ * ExecutionOrderEnforcerOperatorFactory}. If chaining is explicitly disabled, the enforcer can not
+ * work as expected.
+ *
+ * <p>The operator is used only for dynamic filtering at present.
+ */
+public class ExecutionOrderEnforcerOperator<IN> extends AbstractStreamOperatorV2<IN>
+        implements MultipleInputStreamOperator<IN> {
+
+    public ExecutionOrderEnforcerOperator(StreamOperatorParameters<IN> parameters) {
+        super(parameters, 2);
+    }
+
+    @Override
+    public List<Input> getInputs() {
+        return Arrays.asList(
+                new ForwardingInput<>(this, 1, output), new ForwardingInput<>(this, 2, output));
+    }
+
+    private static class ForwardingInput<IN> extends AbstractInput<IN, IN> {
+        private final Output<StreamRecord<IN>> output;
+
+        public ForwardingInput(
+                AbstractStreamOperatorV2<IN> owner, int inputId, Output<StreamRecord<IN>> output) {
+            super(owner, inputId);
+            this.output = output;
+        }
+
+        @Override
+        public void processElement(StreamRecord<IN> element) throws Exception {
+            output.collect(element);
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperatorFactory.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperatorFactory.java
new file mode 100644
index 00000000000..291d16d2a46
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperatorFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+/**
+ * The factory class for {@link ExecutionOrderEnforcerOperator}. This is a simple operator factory
+ * whose chaining strategy is always ChainingStrategy.HEAD_WITH_SOURCES.
+ */
+public class ExecutionOrderEnforcerOperatorFactory<IN> extends AbstractStreamOperatorFactory<IN> {
+
+    @Override
+    public <T extends StreamOperator<IN>> T createStreamOperator(
+            StreamOperatorParameters<IN> parameters) {
+        return (T) new ExecutionOrderEnforcerOperator<>(parameters);
+    }
+
+    @Override
+    public ChainingStrategy getChainingStrategy() {
+        return ChainingStrategy.HEAD_WITH_SOURCES;
+    }
+
+    @Override
+    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+        return ExecutionOrderEnforcerOperator.class;
+    }
+}


[flink] 01/03: [FLINK-28709][source] Introduce coordinatorListeningID in SourceCoordinator to listen to events from other coordinators

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9bda67795628576aa4f161df6cb976ba71c3936d
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Jul 27 15:48:38 2022 +0800

    [FLINK-28709][source] Introduce coordinatorListeningID in SourceCoordinator to listen to events from other coordinators
    
    This closes #20374
---
 .../source/coordinator/SourceCoordinator.java      | 23 +++++++++++++--
 .../coordinator/SourceCoordinatorProvider.java     |  8 +++--
 .../coordinator/SourceCoordinatorProviderTest.java |  3 +-
 .../source/coordinator/SourceCoordinatorTest.java  | 34 ++++++++++++++++++----
 .../coordinator/SourceCoordinatorTestBase.java     |  3 +-
 .../api/operators/SourceOperatorFactory.java       | 11 ++++++-
 .../api/transformations/SourceTransformation.java  | 12 ++++++++
 .../SourceTransformationTranslator.java            |  1 +
 8 files changed, 83 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index eff63a023a9..c4083f9d173 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -111,6 +111,12 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
     /** A flag marking whether the coordinator has started. */
     private boolean started;
 
+    /**
+     * An ID that the coordinator will register self in the coordinator store with. Other
+     * coordinators may send events to this coordinator by the ID.
+     */
+    @Nullable private final String coordinatorListeningID;
+
     public SourceCoordinator(
             String operatorName,
             Source<?, SplitT, EnumChkT> source,
@@ -121,7 +127,8 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
                 source,
                 context,
                 coordinatorStore,
-                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                null);
     }
 
     public SourceCoordinator(
@@ -129,13 +136,15 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
             Source<?, SplitT, EnumChkT> source,
             SourceCoordinatorContext<SplitT> context,
             CoordinatorStore coordinatorStore,
-            WatermarkAlignmentParams watermarkAlignmentParams) {
+            WatermarkAlignmentParams watermarkAlignmentParams,
+            @Nullable String coordinatorListeningID) {
         this.operatorName = operatorName;
         this.source = source;
         this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
         this.context = context;
         this.coordinatorStore = coordinatorStore;
         this.watermarkAlignmentParams = watermarkAlignmentParams;
+        this.coordinatorListeningID = coordinatorListeningID;
 
         if (watermarkAlignmentParams.isEnabled()) {
             if (context.isConcurrentExecutionAttemptsSupported()) {
@@ -214,6 +223,16 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
         // We rely on the single-threaded coordinator executor to guarantee
         // the other methods are invoked after the enumerator has started.
         runInEventLoop(() -> enumerator.start(), "starting the SplitEnumerator.");
+
+        if (coordinatorListeningID != null) {
+            if (coordinatorStore.containsKey(coordinatorListeningID)) {
+                // The coordinator will be recreated after global failover. It should be registered
+                // again replacing the previous one.
+                coordinatorStore.computeIfPresent(coordinatorListeningID, (id, origin) -> this);
+            } else {
+                coordinatorStore.putIfAbsent(coordinatorListeningID, this);
+            }
+        }
     }
 
     @Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 3575c13c974..c76a3ec1910 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -43,6 +43,7 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
     private final Source<?, SplitT, ?> source;
     private final int numWorkerThreads;
     private final WatermarkAlignmentParams alignmentParams;
+    @Nullable private final String coordinatorListeningID;
 
     /**
      * Construct the {@link SourceCoordinatorProvider}.
@@ -60,12 +61,14 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
             OperatorID operatorID,
             Source<?, SplitT, ?> source,
             int numWorkerThreads,
-            WatermarkAlignmentParams alignmentParams) {
+            WatermarkAlignmentParams alignmentParams,
+            @Nullable String coordinatorListeningID) {
         super(operatorID);
         this.operatorName = operatorName;
         this.source = source;
         this.numWorkerThreads = numWorkerThreads;
         this.alignmentParams = alignmentParams;
+        this.coordinatorListeningID = coordinatorListeningID;
     }
 
     @Override
@@ -87,7 +90,8 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
                 source,
                 sourceCoordinatorContext,
                 context.getCoordinatorStore(),
-                alignmentParams);
+                alignmentParams,
+                coordinatorListeningID);
     }
 
     /** A thread factory class that provides some helper methods. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index 5dd7196a82e..5452c4c0e06 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -55,7 +55,8 @@ public class SourceCoordinatorProviderTest {
                         OPERATOR_ID,
                         new MockSource(Boundedness.BOUNDED, NUM_SPLITS),
                         1,
-                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        null);
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 03bd8a30c3b..73ef276d7fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
 import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
 import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -249,7 +250,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                                 new EnumeratorCreatingSource<>(() -> splitEnumerator),
                                 context,
                                 new CoordinatorStoreImpl(),
-                                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED)) {
+                                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                                null)) {
 
             coordinator.start();
             waitUtil(
@@ -273,7 +275,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                                 }),
                         context,
                         new CoordinatorStoreImpl(),
-                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        null);
 
         coordinator.start();
 
@@ -299,7 +302,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                                 new EnumeratorCreatingSource<>(() -> splitEnumerator),
                                 context,
                                 new CoordinatorStoreImpl(),
-                                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED)) {
+                                WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                                null)) {
 
             coordinator.start();
             coordinator.handleEventFromOperator(1, 0, new SourceEventWrapper(new SourceEvent() {}));
@@ -382,7 +386,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                         context.getOperatorId(),
                         source,
                         1,
-                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        null);
 
         final OperatorCoordinator coordinator = provider.getCoordinator(context);
         coordinator.start();
@@ -409,7 +414,8 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                         context.getOperatorId(),
                         source,
                         1,
-                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        null);
 
         final OperatorCoordinator coordinator = provider.getCoordinator(context);
         coordinator.resetToCheckpoint(1L, createEmptyCheckpoint());
@@ -505,6 +511,24 @@ class SourceCoordinatorTest extends SourceCoordinatorTestBase {
         assertThat(events.get(5)).isInstanceOf(NoMoreSplitsEvent.class);
     }
 
+    @Test
+    public void testListeningEventsFromOtherCoordinators() throws Exception {
+        final String listeningID = "testListeningID";
+
+        CoordinatorStore store = new CoordinatorStoreImpl();
+        final SourceCoordinator<?, ?> coordinator =
+                new SourceCoordinator<>(
+                        OPERATOR_NAME,
+                        createMockSource(),
+                        context,
+                        store,
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        listeningID);
+        coordinator.start();
+
+        assertThat(store.get(listeningID)).isNotNull().isSameAs(coordinator);
+    }
+
     // ------------------------------------------------------------------------
     //  test helpers
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index fa52d1feae8..94d285b2a65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -204,7 +204,8 @@ abstract class SourceCoordinatorTestBase {
                 mockSource,
                 getNewSourceCoordinatorContext(),
                 new CoordinatorStoreImpl(),
-                watermarkAlignmentParams);
+                watermarkAlignmentParams,
+                null);
     }
 
     Source<Integer, MockSourceSplit, Set<MockSourceSplit>> createMockSource() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 2bd945d5f4c..4c364beb2b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -34,6 +34,8 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
 import org.apache.flink.util.function.FunctionWithException;
 
+import javax.annotation.Nullable;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** The Factory class for {@link SourceOperator}. */
@@ -54,6 +56,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
     /** The number of worker thread for the source coordinator. */
     private final int numCoordinatorWorkerThread;
 
+    private @Nullable String coordinatorListeningID;
+
     public SourceOperatorFactory(
             Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy) {
         this(source, watermarkStrategy, true /* emit progressive watermarks */, 1);
@@ -81,6 +85,10 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
         return source.getBoundedness();
     }
 
+    public void setCoordinatorListeningID(@Nullable String coordinatorListeningID) {
+        this.coordinatorListeningID = coordinatorListeningID;
+    }
+
     @Override
     public <T extends StreamOperator<OUT>> T createStreamOperator(
             StreamOperatorParameters<OUT> parameters) {
@@ -128,7 +136,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
                 operatorID,
                 source,
                 numCoordinatorWorkerThread,
-                watermarkStrategy.getAlignmentParameters());
+                watermarkStrategy.getAlignmentParameters(),
+                coordinatorListeningID);
     }
 
     @SuppressWarnings("rawtypes")
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
index 49a9042f3ad..0ed6ccd8d82 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.List;
 
@@ -41,6 +43,7 @@ public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT>
     private final WatermarkStrategy<OUT> watermarkStrategy;
 
     private ChainingStrategy chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
+    private @Nullable String coordinatorListeningID;
 
     /**
      * Creates a new {@code Transformation} with the given name, output type and parallelism.
@@ -94,4 +97,13 @@ public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT>
     public ChainingStrategy getChainingStrategy() {
         return chainingStrategy;
     }
+
+    public void setCoordinatorListeningID(@Nullable String coordinatorListeningID) {
+        this.coordinatorListeningID = coordinatorListeningID;
+    }
+
+    @Nullable
+    public String getCoordinatorListeningID() {
+        return coordinatorListeningID;
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
index 88f68d2d66e..af251755a77 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SourceTransformationTranslator.java
@@ -77,6 +77,7 @@ public class SourceTransformationTranslator<OUT, SplitT extends SourceSplit, Enu
                         emitProgressiveWatermarks);
 
         operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
+        operatorFactory.setCoordinatorListeningID(transformation.getCoordinatorListeningID());
 
         streamGraph.addSource(
                 transformationId,


[flink] 02/03: [FLINK-28709][table] Introduce DynamicFilteringData and the DynamicFilteringDataCollectorOperator to build and distribute the data

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3a8e71e286d1d097fe6cfc8b7ff2125a5108f334
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Thu Jul 28 18:27:02 2022 +0800

    [FLINK-28709][table] Introduce DynamicFilteringData and the DynamicFilteringDataCollectorOperator to build and distribute the data
    
    This closes #20374
---
 .../connector/source/DynamicFilteringData.java     | 196 +++++++++++++++++++++
 .../connector/source/DynamicFilteringEvent.java    |  45 +++++
 .../DynamicFilteringDataCollectorOperator.java     | 178 +++++++++++++++++++
 ...cFilteringDataCollectorOperatorCoordinator.java | 154 ++++++++++++++++
 ...namicFilteringDataCollectorOperatorFactory.java | 100 +++++++++++
 ...teringDataCollectorOperatorCoordinatorTest.java | 113 ++++++++++++
 .../DynamicFilteringDataCollectorOperatorTest.java | 123 +++++++++++++
 .../dynamicfiltering/DynamicFilteringDataTest.java | 138 +++++++++++++++
 8 files changed, 1047 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
new file mode 100644
index 00000000000..0a81bffdd4c
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+
+    /**
+     * Serialized rows for filtering. The types of the row values must be Flink internal data type,
+     * i.e. type returned by the FieldGetter. The list should be sorted and distinct.
+     */
+    private final List<byte[]> serializedData;
+
+    /** Whether the data actually does filter. If false, everything is considered contained. */
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, List<RowData>> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = checkNotNull(typeInfo);
+        this.rowType = checkNotNull(rowType);
+        this.serializedData = checkNotNull(serializedData);
+        this.isFiltering = isFiltering;
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    /**
+     * Returns true if the dynamic filtering data contains the specific row.
+     *
+     * @param row the row to be tested. Types of the row values must be Flink internal data type,
+     *     i.e. type returned by the FieldGetter.
+     * @return true if the dynamic filtering data contains the specific row
+     */
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            List<RowData> mayMatchRowData = dataMap.get(hash(row));
+            if (mayMatchRowData == null) {
+                return false;
+            }
+            for (RowData mayMatch : mayMatchRowData) {
+                if (matchRow(row, mayMatch)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    private boolean matchRow(RowData row, RowData mayMatch) {
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            if (!Objects.equals(
+                    fieldGetters[i].getFieldOrNull(row),
+                    fieldGetters[i].getFieldOrNull(mayMatch))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void prepare() {
+        if (!prepared) {
+            synchronized (this) {
+                if (!prepared) {
+                    doPrepare();
+                    prepared = true;
+                }
+            }
+        }
+    }
+
+    private void doPrepare() {
+        this.dataMap = new HashMap<>();
+        if (isFiltering) {
+            this.fieldGetters =
+                    IntStream.range(0, rowType.getFieldCount())
+                            .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
+                            .toArray(RowData.FieldGetter[]::new);
+
+            TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
+            for (byte[] bytes : serializedData) {
+                try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+                        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
+                    RowData partition = serializer.deserialize(inView);
+                    List<RowData> partitions =
+                            dataMap.computeIfAbsent(hash(partition), k -> new ArrayList<>());
+                    partitions.add(partition);
+                } catch (Exception e) {
+                    throw new TableException("Unable to deserialize the value.", e);
+                }
+            }
+        }
+    }
+
+    private int hash(RowData row) {
+        return Objects.hash(Arrays.stream(fieldGetters).map(g -> g.getFieldOrNull(row)).toArray());
+    }
+
+    public static boolean isEqual(DynamicFilteringData data, DynamicFilteringData another) {
+        if (data == null) {
+            return another == null;
+        }
+        if (another == null
+                || (data.isFiltering != another.isFiltering)
+                || !data.typeInfo.equals(another.typeInfo)
+                || !data.rowType.equals(another.rowType)
+                || data.serializedData.size() != another.serializedData.size()) {
+            return false;
+        }
+
+        BytePrimitiveArrayComparator comparator = new BytePrimitiveArrayComparator(true);
+        for (int i = 0; i < data.serializedData.size(); i++) {
+            if (comparator.compare(data.serializedData.get(i), another.serializedData.get(i))
+                    != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @VisibleForTesting
+    public Collection<RowData> getData() {
+        prepare();
+        return dataMap.values().stream().flatMap(List::stream).collect(Collectors.toList());
+    }
+
+    @Override
+    public String toString() {
+        return "DynamicFilteringData{"
+                + "isFiltering="
+                + isFiltering
+                + ", data size="
+                + serializedData.size()
+                + '}';
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java
new file mode 100644
index 00000000000..dcc826e661d
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A source event to transport the {@link DynamicFilteringData} for dynamic filtering purpose. The
+ * event is sent by the DynamicFilteringDataCollector to the enumerator of a source that supports
+ * dynamic filtering, via DynamicFilteringDataCollectorCoordinator and SourceCoordinator.
+ */
+public class DynamicFilteringEvent implements SourceEvent {
+    private final DynamicFilteringData data;
+
+    public DynamicFilteringEvent(DynamicFilteringData data) {
+        this.data = checkNotNull(data);
+    }
+
+    public DynamicFilteringData getData() {
+        return data;
+    }
+
+    @Override
+    public String toString() {
+        return "DynamicFilteringEvent{" + "data=" + data + '}';
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java
new file mode 100644
index 00000000000..1d42dd08f7f
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+    private transient FieldGetter[] fieldGetters;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = checkNotNull(dynamicFilteringFieldType);
+        this.dynamicFilteringFieldIndices = checkNotNull(dynamicFilteringFieldIndices);
+        this.threshold = threshold;
+        this.operatorEventGateway = checkNotNull(operatorEventGateway);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+        this.fieldGetters =
+                IntStream.range(0, dynamicFilteringFieldIndices.size())
+                        .mapToObj(
+                                i ->
+                                        RowData.createFieldGetter(
+                                                dynamicFilteringFieldType.getTypeAt(i),
+                                                dynamicFilteringFieldIndices.get(i)))
+                        .toArray(FieldGetter[]::new);
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            rowData.setField(i, fieldGetters[i].getFieldOrNull(value));
+        }
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+            serializer.serialize(rowData, wrapper);
+            boolean duplicated = !buffer.add(baos.toByteArray());
+            if (duplicated) {
+                return;
+            }
+
+            currentSize += baos.size();
+        }
+
+        if (exceedThreshold()) {
+            // Clear the filtering data and disable self by leaving the currentSize unchanged
+            buffer.clear();
+            LOG.warn(
+                    "Collected data size exceeds the threshold, {} > {}, dynamic filtering is disabled.",
+                    currentSize,
+                    threshold);
+        }
+    }
+
+    private boolean exceedThreshold() {
+        return threshold > 0 && currentSize > threshold;
+    }
+
+    @Override
+    public void finish() throws Exception {
+        if (exceedThreshold()) {
+            LOG.info(
+                    "Finish collecting. {} bytes are collected which exceeds the threshold {}. Sending empty data.",
+                    currentSize,
+                    threshold);
+        } else {
+            LOG.info(
+                    "Finish collecting. {} bytes in {} rows are collected. Sending the data.",
+                    currentSize,
+                    buffer.size());
+        }
+        sendEvent();
+    }
+
+    private void sendEvent() {
+        final DynamicFilteringData dynamicFilteringData;
+        if (exceedThreshold()) {
+            dynamicFilteringData =
+                    new DynamicFilteringData(
+                            typeInfo, dynamicFilteringFieldType, Collections.emptyList(), false);
+        } else {
+            dynamicFilteringData =
+                    new DynamicFilteringData(
+                            typeInfo, dynamicFilteringFieldType, new ArrayList<>(buffer), true);
+        }
+
+        DynamicFilteringEvent event = new DynamicFilteringEvent(dynamicFilteringData);
+        operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (buffer != null) {
+            buffer.clear();
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
new file mode 100644
index 00000000000..50b10768765
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
+ * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators.
+ */
+public class DynamicFilteringDataCollectorOperatorCoordinator
+        implements OperatorCoordinator, CoordinationRequestHandler {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
+
+    private final CoordinatorStore coordinatorStore;
+    private final List<String> dynamicFilteringDataListenerIDs;
+
+    private DynamicFilteringData receivedFilteringData;
+
+    public DynamicFilteringDataCollectorOperatorCoordinator(
+            Context context, List<String> dynamicFilteringDataListenerIDs) {
+        this.coordinatorStore = checkNotNull(context.getCoordinatorStore());
+        this.dynamicFilteringDataListenerIDs = checkNotNull(dynamicFilteringDataListenerIDs);
+    }
+
+    @Override
+    public void start() throws Exception {}
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
+            throws Exception {
+        DynamicFilteringData currentData =
+                ((DynamicFilteringEvent) ((SourceEventWrapper) event).getSourceEvent()).getData();
+        if (receivedFilteringData == null) {
+            receivedFilteringData = currentData;
+        } else {
+            // Since there might be speculative execution or failover, we may receive multiple
+            // notifications, and we can't tell for sure which one is valid for further processing.
+            if (DynamicFilteringData.isEqual(receivedFilteringData, currentData)) {
+                // If the notifications contain exactly the same data, everything is alright, and
+                // we don't need to send the event again.
+                return;
+            } else {
+                // In case the mismatching of the source filtering result and the dim data, which
+                // may leads to incorrect result, trigger global failover for fully recomputing.
+                throw new IllegalStateException(
+                        "DynamicFilteringData is recomputed but not equal. "
+                                + "Triggering global failover in case the result is incorrect. "
+                                + " It's recommended to re-run the job with dynamic filtering disabled.");
+            }
+        }
+
+        for (String listenerID : dynamicFilteringDataListenerIDs) {
+            // Push event to listening source coordinators.
+            OperatorCoordinator listener = (OperatorCoordinator) coordinatorStore.get(listenerID);
+            if (listener == null) {
+                throw new IllegalStateException(
+                        "Dynamic filtering data listener is missing: " + listenerID);
+            } else {
+                LOG.info(
+                        "Distributing event {} to source coordinator with ID {}",
+                        event,
+                        listenerID);
+                // Subtask index and attempt number is not necessary for handling
+                // DynamicFilteringEvent.
+                listener.handleEventFromOperator(0, 0, event);
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
+            CoordinationRequest request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void subtaskReset(int subtask, long checkpointId) {}
+
+    @Override
+    public void executionAttemptFailed(
+            int subtask, int attemptNumber, @Nullable Throwable reason) {}
+
+    @Override
+    public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {}
+
+    @Override
+    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
+            throws Exception {}
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {}
+
+    @Override
+    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData)
+            throws Exception {}
+
+    /** Provider for {@link DynamicFilteringDataCollectorOperatorCoordinator}. */
+    public static class Provider extends RecreateOnResetOperatorCoordinator.Provider {
+        private final List<String> dynamicFilteringDataListenerIDs;
+
+        public Provider(OperatorID operatorID, List<String> dynamicFilteringDataListenerIDs) {
+            super(operatorID);
+            this.dynamicFilteringDataListenerIDs = checkNotNull(dynamicFilteringDataListenerIDs);
+        }
+
+        @Override
+        protected OperatorCoordinator getCoordinator(Context context) throws Exception {
+            return new DynamicFilteringDataCollectorOperatorCoordinator(
+                    context, dynamicFilteringDataListenerIDs);
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java
new file mode 100644
index 00000000000..e9ad35e9c9b
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The factory class for {@link DynamicFilteringDataCollectorOperator}. */
+public class DynamicFilteringDataCollectorOperatorFactory
+        extends AbstractStreamOperatorFactory<Object>
+        implements CoordinatedOperatorFactory<Object> {
+
+    private final Set<String> dynamicFilteringDataListenerIDs = new HashSet<>();
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+
+    public DynamicFilteringDataCollectorOperatorFactory(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold) {
+        this.dynamicFilteringFieldType = checkNotNull(dynamicFilteringFieldType);
+        this.dynamicFilteringFieldIndices = checkNotNull(dynamicFilteringFieldIndices);
+        this.threshold = threshold;
+    }
+
+    @Override
+    public <T extends StreamOperator<Object>> T createStreamOperator(
+            StreamOperatorParameters<Object> parameters) {
+        final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+        final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
+        OperatorEventGateway operatorEventGateway =
+                eventDispatcher.getOperatorEventGateway(operatorId);
+
+        DynamicFilteringDataCollectorOperator operator =
+                new DynamicFilteringDataCollectorOperator(
+                        dynamicFilteringFieldType,
+                        dynamicFilteringFieldIndices,
+                        threshold,
+                        operatorEventGateway);
+
+        operator.setup(
+                parameters.getContainingTask(),
+                parameters.getStreamConfig(),
+                parameters.getOutput());
+
+        // today's lunch is generics spaghetti
+        @SuppressWarnings("unchecked")
+        final T castedOperator = (T) operator;
+
+        return castedOperator;
+    }
+
+    public void registerDynamicFilteringDataListenerID(String id) {
+        this.dynamicFilteringDataListenerIDs.add(checkNotNull(id));
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+        return DynamicFilteringDataCollectorOperator.class;
+    }
+
+    @Override
+    public OperatorCoordinator.Provider getCoordinatorProvider(
+            String operatorName, OperatorID operatorID) {
+        return new DynamicFilteringDataCollectorOperatorCoordinator.Provider(
+                operatorID, new ArrayList<>(dynamicFilteringDataListenerIDs));
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinatorTest.java
new file mode 100644
index 00000000000..86b5638bac9
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinatorTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link DynamicFilteringDataCollectorOperatorCoordinator}. */
+class DynamicFilteringDataCollectorOperatorCoordinatorTest {
+
+    @Test
+    void testRedistributeData() throws Exception {
+        MockOperatorCoordinatorContext context =
+                new MockOperatorCoordinatorContext(new OperatorID(), 1);
+        String listenerID1 = "test-listener-1";
+        String listenerID2 = "test-listener-2";
+
+        TestingOperatorCoordinator listener1 = new TestingOperatorCoordinator(context);
+        TestingOperatorCoordinator listener2 = new TestingOperatorCoordinator(context);
+        context.getCoordinatorStore().putIfAbsent(listenerID1, listener1);
+        context.getCoordinatorStore().putIfAbsent(listenerID2, listener2);
+
+        RowType rowType = RowType.of(new IntType());
+        OperatorEvent testEvent = dynamicFilteringEvent(rowType, Collections.emptyList());
+        try (DynamicFilteringDataCollectorOperatorCoordinator coordinator =
+                new DynamicFilteringDataCollectorOperatorCoordinator(
+                        context, Arrays.asList(listenerID1, listenerID2))) {
+            coordinator.handleEventFromOperator(0, 1, testEvent);
+        }
+
+        assertThat(listener1.getNextReceivedOperatorEvent()).isSameAs(testEvent);
+        assertThat(listener1.getNextReceivedOperatorEvent()).isNull();
+        assertThat(listener2.getNextReceivedOperatorEvent()).isSameAs(testEvent);
+        assertThat(listener2.getNextReceivedOperatorEvent()).isNull();
+    }
+
+    @Test
+    void testTaskFailover() throws Exception {
+        MockOperatorCoordinatorContext context =
+                new MockOperatorCoordinatorContext(new OperatorID(), 1);
+        String listenerID = "test-listener-1";
+
+        TestingOperatorCoordinator listener = new TestingOperatorCoordinator(context);
+        context.getCoordinatorStore().putIfAbsent(listenerID, listener);
+
+        RowType rowType = RowType.of(new IntType());
+        try (DynamicFilteringDataCollectorOperatorCoordinator coordinator =
+                new DynamicFilteringDataCollectorOperatorCoordinator(
+                        context, Arrays.asList(listenerID))) {
+            OperatorEvent testEvent =
+                    dynamicFilteringEvent(rowType, Collections.singletonList(new byte[] {1, 2}));
+            coordinator.handleEventFromOperator(0, 0, testEvent);
+            assertThat(listener.getNextReceivedOperatorEvent()).isSameAs(testEvent);
+
+            // failover happens
+            coordinator.executionAttemptFailed(0, 0, null);
+
+            OperatorEvent testEvent1 =
+                    dynamicFilteringEvent(rowType, Collections.singletonList(new byte[] {1, 2}));
+            coordinator.handleEventFromOperator(0, 1, testEvent1);
+            // testEvent1 contains the same data as testEvent, nothing happens and don't send event
+            assertThat(listener.getNextReceivedOperatorEvent()).isNull();
+
+            // failover happens again
+            coordinator.executionAttemptFailed(0, 1, null);
+
+            OperatorEvent testEvent2 =
+                    dynamicFilteringEvent(rowType, Collections.singletonList(new byte[] {1, 3}));
+            assertThatThrownBy(() -> coordinator.handleEventFromOperator(0, 2, testEvent2))
+                    .isInstanceOf(IllegalStateException.class);
+        }
+    }
+
+    private OperatorEvent dynamicFilteringEvent(RowType rowType, List<byte[]> data) {
+        return new SourceEventWrapper(
+                new DynamicFilteringEvent(
+                        new DynamicFilteringData(
+                                InternalTypeInfo.of(rowType), rowType, data, data.isEmpty())));
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java
new file mode 100644
index 00000000000..0fc6a9b8b97
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DynamicFilteringDataCollectorOperator}. */
+class DynamicFilteringDataCollectorOperatorTest {
+
+    @Test
+    void testCollectDynamicFilteringData() throws Exception {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        List<Integer> indexes = Arrays.asList(0, 1, 3);
+        MockOperatorEventGateway gateway = new MockOperatorEventGateway();
+
+        DynamicFilteringDataCollectorOperator operator =
+                new DynamicFilteringDataCollectorOperator(rowType, indexes, -1, gateway);
+        ConcurrentLinkedQueue<Object> output;
+        try (OneInputStreamOperatorTestHarness<RowData, Object> harness =
+                new OneInputStreamOperatorTestHarness<>(operator)) {
+            output = harness.getOutput();
+            harness.setup();
+            harness.open();
+
+            for (long i = 0L; i < 3L; i++) {
+                harness.processElement(rowData(1, 1L, 0, "a"), i);
+            }
+            harness.processElement(rowData(2, 1L, 0, null), 3L);
+
+            // operator.finish is called when closing
+        }
+
+        assertThat(output).isEmpty();
+
+        assertThat(gateway.getEventsSent()).hasSize(1);
+        OperatorEvent event = gateway.getEventsSent().get(0);
+        assertThat(event).isInstanceOf(SourceEventWrapper.class);
+        SourceEvent dynamicFilteringEvent = ((SourceEventWrapper) event).getSourceEvent();
+        assertThat(dynamicFilteringEvent).isInstanceOf(DynamicFilteringEvent.class);
+
+        DynamicFilteringData data = ((DynamicFilteringEvent) dynamicFilteringEvent).getData();
+        assertThat(data.isFiltering()).isTrue();
+        assertThat(data.getData()).hasSize(2);
+        assertThat(data.contains(rowData(1, 1L, "a"))).isTrue();
+        assertThat(data.contains(rowData(2, 1L, null))).isTrue();
+    }
+
+    @Test
+    void testExceedsThreshold() throws Exception {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        List<Integer> indexes = Arrays.asList(0, 1, 3);
+        MockOperatorEventGateway gateway = new MockOperatorEventGateway();
+        // Can hold at most 2 rows
+        int thresholds = 100;
+
+        DynamicFilteringDataCollectorOperator operator =
+                new DynamicFilteringDataCollectorOperator(rowType, indexes, thresholds, gateway);
+        try (OneInputStreamOperatorTestHarness<RowData, Object> harness =
+                new OneInputStreamOperatorTestHarness<>(operator)) {
+            harness.setup();
+            harness.open();
+
+            harness.processElement(rowData(1, 1L, 0, "a"), 1L);
+            harness.processElement(rowData(2, 1L, 0, "b"), 2L);
+            harness.processElement(rowData(3, 1L, 0, "c"), 3L);
+        }
+
+        assertThat(gateway.getEventsSent()).hasSize(1);
+        OperatorEvent event = gateway.getEventsSent().get(0);
+        assertThat(event).isInstanceOf(SourceEventWrapper.class);
+        SourceEvent dynamicFilteringEvent = ((SourceEventWrapper) event).getSourceEvent();
+        assertThat(dynamicFilteringEvent).isInstanceOf(DynamicFilteringEvent.class);
+        DynamicFilteringData data = ((DynamicFilteringEvent) dynamicFilteringEvent).getData();
+        assertThat(data.isFiltering()).isFalse();
+    }
+
+    private RowData rowData(Object... values) {
+        GenericRowData rowData = new GenericRowData(values.length);
+        for (int i = 0; i < values.length; ++i) {
+            Object value = values[i];
+            value = value instanceof String ? new BinaryStringData((String) value) : value;
+            rowData.setField(i, value);
+        }
+        return rowData;
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataTest.java
new file mode 100644
index 00000000000..7ecb43b89fb
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link DynamicFilteringData}. The tests are here since it's hard to test
+ * DynamicFilteringData without the support of flink-table-runtime.
+ */
+class DynamicFilteringDataTest {
+    @Test
+    void testContains() {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        TypeInformation<RowData> rowTypeInfo = InternalTypeInfo.of(rowType);
+        List<RowData> buildRows = new ArrayList<>();
+        buildRows.add(rowData(1, 1L, "a"));
+        buildRows.add(rowData(2, 1L, null));
+        buildRows.add(rowData(1, null, "b"));
+        buildRows.add(rowData(null, 2L, "c"));
+        buildRows.add(rowData(0, 31L, "d"));
+        List<byte[]> serializedData =
+                buildRows.stream().map(r -> serialize(rowTypeInfo, r)).collect(Collectors.toList());
+
+        DynamicFilteringData data =
+                new DynamicFilteringData(rowTypeInfo, rowType, serializedData, true);
+
+        for (RowData r : buildRows) {
+            assertThat(data.contains(r)).isTrue();
+        }
+        assertThat(data.contains(rowData(0, 1L, "a"))).isFalse();
+        assertThat(data.contains(rowData(1, 1L, null))).isFalse();
+        // Has the same hash as (0, 31L, "d")
+        assertThat(data.contains(rowData(1, 0L, "d"))).isFalse();
+    }
+
+    @Test
+    void testNotFiltering() {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        DynamicFilteringData data =
+                new DynamicFilteringData(
+                        InternalTypeInfo.of(rowType), rowType, Collections.emptyList(), false);
+        assertThat(data.contains(rowData(1, 1L, "a"))).isTrue();
+    }
+
+    @Test
+    void testAddHashConflictingData() {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        TypeInformation<RowData> rowTypeInfo = InternalTypeInfo.of(rowType);
+        List<RowData> buildRows = new ArrayList<>();
+        buildRows.add(rowData(0, 31L, "d"));
+        // Has the same hash as (0, 31L, "d")
+        buildRows.add(rowData(1, 0L, "d"));
+
+        List<byte[]> serializedData =
+                buildRows.stream().map(r -> serialize(rowTypeInfo, r)).collect(Collectors.toList());
+
+        DynamicFilteringData data =
+                new DynamicFilteringData(rowTypeInfo, rowType, serializedData, true);
+
+        for (RowData r : buildRows) {
+            assertThat(data.contains(r)).isTrue();
+        }
+    }
+
+    @Test
+    void testMismatchingRowDataArity() {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        DynamicFilteringData data =
+                new DynamicFilteringData(
+                        InternalTypeInfo.of(rowType), rowType, Collections.emptyList(), true);
+        assertThatThrownBy(() -> data.contains(rowData(1, 1L)))
+                .isInstanceOf(TableException.class)
+                .hasMessage("The arity of RowData is different");
+    }
+
+    private RowData rowData(Object... values) {
+        GenericRowData rowData = new GenericRowData(values.length);
+        for (int i = 0; i < values.length; ++i) {
+            Object value = values[i];
+            value = value instanceof String ? new BinaryStringData((String) value) : value;
+            rowData.setField(i, value);
+        }
+        return rowData;
+    }
+
+    private byte[] serialize(TypeInformation<RowData> typeInfo, RowData row) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+            typeInfo.createSerializer(new ExecutionConfig())
+                    .serialize(row, new DataOutputViewStreamWrapper(baos));
+        } catch (IOException e) {
+            // throw as RuntimeException so the function can use in lambda
+            throw new RuntimeException(e);
+        }
+        return baos.toByteArray();
+    }
+}