You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/27 07:50:42 UTC

[GitHub] [flink] pltbkd opened a new pull request, #20374: [FLINK-TBD][source] Runtime support for Dynamic filtering.

pltbkd opened a new pull request, #20374:
URL: https://github.com/apache/flink/pull/20374

   …ator to listen to events from other coordinators.
   
   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   Under draft, TBD.
   
   *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
     - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact*
     - *Deployments RPC transmits only the blob storage reference*
     - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937325378


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DynamicFilteringDataCollectorOperator}. */
+class DynamicFilteringDataCollectorOperatorTest {

Review Comment:
   With the changes above, the operator need no longer check for unsupported types. So I think we don't need the test now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936527373


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -141,6 +144,24 @@ private int hash(RowData row) {
         return Objects.hash(Arrays.stream(fieldGetters).map(g -> g.getFieldOrNull(row)).toArray());
     }
 
+    public boolean equals(DynamicFilteringData another) {

Review Comment:
   Might change it to a static method outside of the DynamicFilteringData if we do not want to make it to be Object#equals



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on PR #20374:
URL: https://github.com/apache/flink/pull/20374#issuecomment-1206161705

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r935239040


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java:
##########
@@ -82,12 +83,18 @@ public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
                         context,
                         splitSerializer,
                         context.isConcurrentExecutionAttemptsSupported());
-        return new SourceCoordinator<>(
+        SourceCoordinator<SplitT, ?> coordinator = new SourceCoordinator<>(
                 operatorName,
                 source,
                 sourceCoordinatorContext,
                 context.getCoordinatorStore(),
                 alignmentParams);
+        coordinator.setCoordinatorListeningID(coordinatorListeningID);
+        return coordinator;
+    }
+
+    public void setCoordinatorListeningID(String coordinatorListeningID) {

Review Comment:
   Might make `coordinatorListeningID` to be a constructor parameter?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -423,6 +433,10 @@ private void runInEventLoop(
                 });
     }
 
+    public void setCoordinatorListeningID(String coordinatorListeningID) {

Review Comment:
   Might make `coordinatorListeningID` an constructor parameter ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java:
##########
@@ -43,6 +43,7 @@ Licensed to the Apache Software Foundation (ASF) under one
     private final Source<?, SplitT, ?> source;
     private final int numWorkerThreads;
     private final WatermarkAlignmentParams alignmentParams;
+    private String coordinatorListeningID;

Review Comment:
   `@Nullable`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -111,6 +111,12 @@ Licensed to the Apache Software Foundation (ASF) under one
     /** A flag marking whether the coordinator has started. */
     private boolean started;
 
+    /**
+     * An ID that the coordinator will register self in the coordinator store with. Other
+     * coordinators may send events to this coordinator by the ID.
+     */
+    private String coordinatorListeningID;

Review Comment:
   `@Nullable`?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, RowData> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = typeInfo;
+        this.rowType = rowType;
+        this.isFiltering = isFiltering;
+        this.serializedData = isFiltering ? serializedData : Collections.emptyList();

Review Comment:
   Is it necessary to do the distinguish ?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, RowData> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = typeInfo;
+        this.rowType = rowType;
+        this.isFiltering = isFiltering;
+        this.serializedData = isFiltering ? serializedData : Collections.emptyList();
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public Collection<RowData> getData() {

Review Comment:
   `@VisibleForTesting` ?
   
   And the two method might be moved to the end of the file.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, RowData> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = typeInfo;
+        this.rowType = rowType;
+        this.isFiltering = isFiltering;
+        this.serializedData = isFiltering ? serializedData : Collections.emptyList();
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {

Review Comment:
   This method is not used now, does it necessary in the future ?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, RowData> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = typeInfo;
+        this.rowType = rowType;
+        this.isFiltering = isFiltering;
+        this.serializedData = isFiltering ? serializedData : Collections.emptyList();
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public Collection<RowData> getData() {
+        prepare();
+        return dataMap.values();
+    }
+
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            RowData rowData = dataMap.get(hash(row));

Review Comment:
   Would the hash conflicts in some cases?
   
   If so, we might lost some partitions?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, RowData> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = typeInfo;
+        this.rowType = rowType;
+        this.isFiltering = isFiltering;
+        this.serializedData = isFiltering ? serializedData : Collections.emptyList();
+    }
+
+    public boolean isFiltering() {

Review Comment:
   `@VisibleForTesting` ?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+    private transient boolean eventSent;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = dynamicFilteringFieldType;
+        this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices;
+        this.threshold = threshold;
+        this.operatorEventGateway = operatorEventGateway;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+        this.eventSent = false;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:
+                    throw new UnsupportedOperationException();

Review Comment:
   nit: Add some message
   
   



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+    private transient boolean eventSent;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = dynamicFilteringFieldType;
+        this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices;
+        this.threshold = threshold;
+        this.operatorEventGateway = operatorEventGateway;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+        this.eventSent = false;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review Comment:
   wrapped with try-with-resource ?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+    private transient boolean eventSent;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = dynamicFilteringFieldType;

Review Comment:
   nit: add checkNotNull when possible



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+    private transient boolean eventSent;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = dynamicFilteringFieldType;
+        this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices;
+        this.threshold = threshold;
+        this.operatorEventGateway = operatorEventGateway;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+        this.eventSent = false;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        serializer.serialize(rowData, wrapper);
+        boolean duplicated = !buffer.add(baos.toByteArray());

Review Comment:
   Why need this line? The byte array seems always newly-created, and byte[] should not override the hashcode / equals method.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
+ * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators.
+ */
+public class DynamicFilteringDataCollectorOperatorCoordinator
+        implements OperatorCoordinator, CoordinationRequestHandler {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
+
+    private final CoordinatorStore coordinatorStore;
+    private final List<String> dynamicFilteringDataListenerIDs;
+
+    private boolean hasReceivedFilteringData;
+
+    public DynamicFilteringDataCollectorOperatorCoordinator(

Review Comment:
   nit: checkNotNull when possible



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+    private transient boolean eventSent;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = dynamicFilteringFieldType;
+        this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices;
+        this.threshold = threshold;
+        this.operatorEventGateway = operatorEventGateway;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+        this.eventSent = false;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        serializer.serialize(rowData, wrapper);
+        boolean duplicated = !buffer.add(baos.toByteArray());
+        if (duplicated) {
+            return;
+        }
+        currentSize += baos.size();
+
+        if (exceedThreshold()) {
+            // Send an empty filtering data and disable self by leaving the currentSize unchanged
+            sendEvent();
+            buffer.clear();
+            LOG.info(

Review Comment:
   nit: change to warn?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+    private transient boolean eventSent;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = dynamicFilteringFieldType;
+        this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices;
+        this.threshold = threshold;
+        this.operatorEventGateway = operatorEventGateway;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+        this.eventSent = false;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        serializer.serialize(rowData, wrapper);
+        boolean duplicated = !buffer.add(baos.toByteArray());
+        if (duplicated) {
+            return;
+        }
+        currentSize += baos.size();
+
+        if (exceedThreshold()) {
+            // Send an empty filtering data and disable self by leaving the currentSize unchanged
+            sendEvent();

Review Comment:
   This seems not necessary? Could we directly do it on finishing and remove the `eventSent` flag ?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+    private transient boolean eventSent;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = dynamicFilteringFieldType;
+        this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices;
+        this.threshold = threshold;
+        this.operatorEventGateway = operatorEventGateway;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+        this.eventSent = false;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        serializer.serialize(rowData, wrapper);
+        boolean duplicated = !buffer.add(baos.toByteArray());
+        if (duplicated) {
+            return;
+        }
+        currentSize += baos.size();
+
+        if (exceedThreshold()) {
+            // Send an empty filtering data and disable self by leaving the currentSize unchanged
+            sendEvent();
+            buffer.clear();
+            LOG.info(
+                    "Collected data size exceeds the threshold, {} > {}, dynamic filtering is disabled.",
+                    currentSize,
+                    threshold);
+        }
+    }
+
+    private boolean exceedThreshold() {
+        return threshold > 0 && currentSize > threshold;
+    }
+
+    @Override
+    public void finish() throws Exception {
+        if (!eventSent) {
+            LOG.info(
+                    "Finish collecting. {} bytes in {} rows are collected. sending the data.",
+                    currentSize,
+                    buffer.size());
+            sendEvent();
+        }
+    }
+
+    private void sendEvent() {
+        if (eventSent) {
+            return;
+        }
+
+        final DynamicFilteringData dynamicFilteringData;
+        if (exceedThreshold()) {
+            dynamicFilteringData =
+                    new DynamicFilteringData(
+                            typeInfo, dynamicFilteringFieldType, new ArrayList<>(), false);

Review Comment:
   nit: Collections.emptyList()



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
+ * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators.
+ */
+public class DynamicFilteringDataCollectorOperatorCoordinator
+        implements OperatorCoordinator, CoordinationRequestHandler {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
+
+    private final CoordinatorStore coordinatorStore;
+    private final List<String> dynamicFilteringDataListenerIDs;
+
+    private boolean hasReceivedFilteringData;
+
+    public DynamicFilteringDataCollectorOperatorCoordinator(
+            Context context, List<String> dynamicFilteringDataListenerIDs) {
+        this.coordinatorStore = context.getCoordinatorStore();
+        this.dynamicFilteringDataListenerIDs = dynamicFilteringDataListenerIDs;
+    }
+
+    @Override
+    public void start() throws Exception {}
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
+            throws Exception {
+        // Since there might be speculative execution, once the dynamic filter collectors operator
+        // has been executed for multiple attempts, we only keep the first notification.
+        if (hasReceivedFilteringData) {
+            return;
+        }
+
+        for (String listenerID : dynamicFilteringDataListenerIDs) {
+            // Push event to listening source coordinators.
+            OperatorCoordinator listener = (OperatorCoordinator) coordinatorStore.get(listenerID);
+            if (listener == null) {
+                LOG.warn("Dynamic filtering data listener is missing: {}", listenerID);

Review Comment:
   Would this be marked as an error?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** The factory class for {@link DynamicFilteringDataCollectorOperator}. */
+public class DynamicFilteringDataCollectorOperatorFactory
+        extends AbstractStreamOperatorFactory<Object>
+        implements CoordinatedOperatorFactory<Object> {
+
+    private final Set<String> dynamicFilteringDataListenerIDs = new HashSet<>();
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+
+    public DynamicFilteringDataCollectorOperatorFactory(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold) {
+        this.dynamicFilteringFieldType = dynamicFilteringFieldType;

Review Comment:
   nit: checkNotNull when possible



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * ExecutionOrderEnforcerOperator has two inputs, one of which is a source, and the other is the
+ * dependent upstream. It enforces that the input source is executed after the dependent input is
+ * finished. Everything passed from the inputs is forwarded to the output, though typically the
+ * dependent input should not send anything.
+ *
+ * <p>The operator must be chained with the source, which is ensured by the {@link

Review Comment:
   This might not be very accurate since users have method to disable chaining. We might change it to say that if not chained, the enforcer would not work, or modify the last paragraph to say that it enforces... when chaining



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r939508279


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+
+    public static final Set<LogicalTypeRoot> SUPPORTED_TYPES;
+
+    static {
+        Set<LogicalTypeRoot> supported = new HashSet<>();
+        supported.add(LogicalTypeRoot.INTEGER);
+        supported.add(LogicalTypeRoot.BIGINT);
+        supported.add(LogicalTypeRoot.SMALLINT);
+        supported.add(LogicalTypeRoot.TINYINT);
+        supported.add(LogicalTypeRoot.VARCHAR);
+        supported.add(LogicalTypeRoot.CHAR);
+        supported.add(LogicalTypeRoot.DATE);
+        supported.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+        supported.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+        supported.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+        SUPPORTED_TYPES = Collections.unmodifiableSet(supported);
+    }
+
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+
+    /** The list should be sorted and distinct. */
+    private final List<byte[]> serializedData;
+
+    /** Whether the data actually does filter. If false, everything is considered contained. */
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, List<RowData>> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = checkNotNull(typeInfo);
+        this.rowType = checkNotNull(rowType);
+        this.serializedData = checkNotNull(serializedData);
+        this.isFiltering = isFiltering;
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            List<RowData> mayMatchRowData = dataMap.get(hash(row));
+            if (mayMatchRowData == null) {
+                return false;
+            }
+            for (RowData mayMatch : mayMatchRowData) {
+                if (matchRow(row, mayMatch)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    private boolean matchRow(RowData row, RowData mayMatch) {
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            if (!Objects.equals(
+                    fieldGetters[i].getFieldOrNull(row),
+                    fieldGetters[i].getFieldOrNull(mayMatch))) {

Review Comment:
   for Date, Time, Timestamp, the inner representation is int/long, but external representation is java.sql.Date, java.sql.Time, java.sql.Timestamp. We should make sure they can be compared. Please also add some tests in DynamicFilteringDataTest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937308705


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = checkNotNull(dynamicFilteringFieldType);
+        this.dynamicFilteringFieldIndices = checkNotNull(dynamicFilteringFieldIndices);
+        this.threshold = threshold;
+        this.operatorEventGateway = checkNotNull(operatorEventGateway);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:

Review Comment:
   we should more types, such as: CHAR, SMALLINT, TINYINT, etc. It's better we can defined the supported types in `DynamicFilteringData` , and then DynamicPartitionPruningRule can skip the query which contains unsupported types.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DynamicFilteringDataCollectorOperator}. */
+class DynamicFilteringDataCollectorOperatorTest {

Review Comment:
   add some test for unsupported types



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+
+    /** The list should be sorted and distinct. */
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;

Review Comment:
   please  add some comments about this field



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+
+    /** The list should be sorted and distinct. */
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, List<RowData>> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = checkNotNull(typeInfo);
+        this.rowType = checkNotNull(rowType);
+        this.serializedData = checkNotNull(serializedData);
+        this.isFiltering = isFiltering;
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            List<RowData> mayMatchRowData = dataMap.get(hash(row));
+            if (mayMatchRowData == null) {
+                return false;
+            }
+            for (RowData mayMatch : mayMatchRowData) {
+                if (matchRow(row, mayMatch)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    private boolean matchRow(RowData row, RowData mayMatch) {
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            if (!Objects.equals(
+                    fieldGetters[i].getFieldOrNull(row),
+                    fieldGetters[i].getFieldOrNull(mayMatch))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void prepare() {
+        if (!prepared) {
+            synchronized (this) {
+                if (!prepared) {
+                    doPrepare();
+                    prepared = true;
+                }
+            }
+        }
+    }
+
+    private void doPrepare() {
+        this.dataMap = new HashMap<>();
+        if (isFiltering) {
+            this.fieldGetters =
+                    IntStream.range(0, rowType.getFieldCount())
+                            .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
+                            .toArray(RowData.FieldGetter[]::new);
+
+            TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
+            for (byte[] bytes : serializedData) {
+                try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+                        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
+                    RowData partition = serializer.deserialize(inView);
+                    List<RowData> partitions =
+                            dataMap.computeIfAbsent(hash(partition), k -> new ArrayList<>());
+                    partitions.add(partition);
+                } catch (Exception e) {
+                    throw new TableException("Unable to deserialize the value.", e);
+                }
+            }
+        }
+    }
+
+    private int hash(RowData row) {
+        return Objects.hash(Arrays.stream(fieldGetters).map(g -> g.getFieldOrNull(row)).toArray());
+    }
+
+    public static boolean isEqual(DynamicFilteringData data, DynamicFilteringData another) {

Review Comment:
   implements `equals` method directly ?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+
+    /** The list should be sorted and distinct. */
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, List<RowData>> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = checkNotNull(typeInfo);
+        this.rowType = checkNotNull(rowType);
+        this.serializedData = checkNotNull(serializedData);
+        this.isFiltering = isFiltering;
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            List<RowData> mayMatchRowData = dataMap.get(hash(row));
+            if (mayMatchRowData == null) {
+                return false;
+            }
+            for (RowData mayMatch : mayMatchRowData) {
+                if (matchRow(row, mayMatch)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    private boolean matchRow(RowData row, RowData mayMatch) {
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            if (!Objects.equals(
+                    fieldGetters[i].getFieldOrNull(row),
+                    fieldGetters[i].getFieldOrNull(mayMatch))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void prepare() {
+        if (!prepared) {
+            synchronized (this) {
+                if (!prepared) {
+                    doPrepare();
+                    prepared = true;
+                }
+            }
+        }
+    }
+
+    private void doPrepare() {
+        this.dataMap = new HashMap<>();
+        if (isFiltering) {
+            this.fieldGetters =
+                    IntStream.range(0, rowType.getFieldCount())
+                            .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
+                            .toArray(RowData.FieldGetter[]::new);
+
+            TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
+            for (byte[] bytes : serializedData) {
+                try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+                        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
+                    RowData partition = serializer.deserialize(inView);
+                    List<RowData> partitions =
+                            dataMap.computeIfAbsent(hash(partition), k -> new ArrayList<>());
+                    partitions.add(partition);
+                } catch (Exception e) {
+                    throw new TableException("Unable to deserialize the value.", e);
+                }
+            }
+        }
+    }
+
+    private int hash(RowData row) {
+        return Objects.hash(Arrays.stream(fieldGetters).map(g -> g.getFieldOrNull(row)).toArray());
+    }
+
+    public static boolean isEqual(DynamicFilteringData data, DynamicFilteringData another) {
+        if (data == null) {
+            return another == null;
+        }
+        if (another == null
+                || (data.isFiltering != another.isFiltering)
+                || !data.typeInfo.equals(another.typeInfo)
+                || !data.rowType.equals(another.rowType)
+                || data.serializedData.size() != another.serializedData.size()) {
+            return false;
+        }
+
+        BytePrimitiveArrayComparator comparator = new BytePrimitiveArrayComparator(true);
+        for (int i = 0; i < data.serializedData.size(); i++) {
+            if (comparator.compare(data.serializedData.get(i), another.serializedData.get(i))
+                    != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @VisibleForTesting
+    public Collection<RowData> getData() {

Review Comment:
   this method is not only used for testing, but also can be used for advanced filtering which `contains` methods can not meet the requirement



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinatorTest.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DynamicFilteringDataCollectorOperatorCoordinator}. */
+class DynamicFilteringDataCollectorOperatorCoordinatorTest {

Review Comment:
   add some tests for failover case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937383257


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -26,30 +26,49 @@
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.ByteArrayInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Data for dynamic filtering. */
 public class DynamicFilteringData implements Serializable {
+
+    public static final Set<LogicalTypeRoot> SUPPORTED_TYPES;
+
+    static {
+        Set<LogicalTypeRoot> supported = new HashSet<>();
+        supported.add(LogicalTypeRoot.INTEGER);
+        supported.add(LogicalTypeRoot.BIGINT);
+        supported.add(LogicalTypeRoot.SMALLINT);
+        supported.add(LogicalTypeRoot.TINYINT);
+        supported.add(LogicalTypeRoot.VARCHAR);
+        supported.add(LogicalTypeRoot.CHAR);
+        SUPPORTED_TYPES = Collections.unmodifiableSet(supported);

Review Comment:
   DATE, TIME_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, TIMESTAMP_WITHOUT_TIME_ZONE are added. 
   In fact as the field values are acquired and used to form a GenericRowData in Hive source in the same way as in the collector operator, I suppose all types are supported at runtime level as long as the type is exactly the same. The supported types set only depends on what types are supported at planner level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937388343


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -26,30 +26,49 @@
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.ByteArrayInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Data for dynamic filtering. */
 public class DynamicFilteringData implements Serializable {
+
+    public static final Set<LogicalTypeRoot> SUPPORTED_TYPES;
+
+    static {
+        Set<LogicalTypeRoot> supported = new HashSet<>();
+        supported.add(LogicalTypeRoot.INTEGER);
+        supported.add(LogicalTypeRoot.BIGINT);
+        supported.add(LogicalTypeRoot.SMALLINT);
+        supported.add(LogicalTypeRoot.TINYINT);
+        supported.add(LogicalTypeRoot.VARCHAR);
+        supported.add(LogicalTypeRoot.CHAR);
+        SUPPORTED_TYPES = Collections.unmodifiableSet(supported);

Review Comment:
   My apology.  I mistook the way hive source to acquire the data. It seems that we can support all types that hive table spec can describe (in string) and can be converted to the runtime type, but need to enumerate these types in Hive source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937316474


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+
+    /** The list should be sorted and distinct. */
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, List<RowData>> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = checkNotNull(typeInfo);
+        this.rowType = checkNotNull(rowType);
+        this.serializedData = checkNotNull(serializedData);
+        this.isFiltering = isFiltering;
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            List<RowData> mayMatchRowData = dataMap.get(hash(row));
+            if (mayMatchRowData == null) {
+                return false;
+            }
+            for (RowData mayMatch : mayMatchRowData) {
+                if (matchRow(row, mayMatch)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    private boolean matchRow(RowData row, RowData mayMatch) {
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            if (!Objects.equals(
+                    fieldGetters[i].getFieldOrNull(row),
+                    fieldGetters[i].getFieldOrNull(mayMatch))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void prepare() {
+        if (!prepared) {
+            synchronized (this) {
+                if (!prepared) {
+                    doPrepare();
+                    prepared = true;
+                }
+            }
+        }
+    }
+
+    private void doPrepare() {
+        this.dataMap = new HashMap<>();
+        if (isFiltering) {
+            this.fieldGetters =
+                    IntStream.range(0, rowType.getFieldCount())
+                            .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
+                            .toArray(RowData.FieldGetter[]::new);
+
+            TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
+            for (byte[] bytes : serializedData) {
+                try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+                        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
+                    RowData partition = serializer.deserialize(inView);
+                    List<RowData> partitions =
+                            dataMap.computeIfAbsent(hash(partition), k -> new ArrayList<>());
+                    partitions.add(partition);
+                } catch (Exception e) {
+                    throw new TableException("Unable to deserialize the value.", e);
+                }
+            }
+        }
+    }
+
+    private int hash(RowData row) {
+        return Objects.hash(Arrays.stream(fieldGetters).map(g -> g.getFieldOrNull(row)).toArray());
+    }
+
+    public static boolean isEqual(DynamicFilteringData data, DynamicFilteringData another) {

Review Comment:
   As the data struct may contain large scale of data, calculate the hashcode with all the data can be costing. So I suppose we'd better not to override the hashcode, as well as the equals.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+
+    /** The list should be sorted and distinct. */
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, List<RowData>> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = checkNotNull(typeInfo);
+        this.rowType = checkNotNull(rowType);
+        this.serializedData = checkNotNull(serializedData);
+        this.isFiltering = isFiltering;
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            List<RowData> mayMatchRowData = dataMap.get(hash(row));
+            if (mayMatchRowData == null) {
+                return false;
+            }
+            for (RowData mayMatch : mayMatchRowData) {
+                if (matchRow(row, mayMatch)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    private boolean matchRow(RowData row, RowData mayMatch) {
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            if (!Objects.equals(
+                    fieldGetters[i].getFieldOrNull(row),
+                    fieldGetters[i].getFieldOrNull(mayMatch))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void prepare() {
+        if (!prepared) {
+            synchronized (this) {
+                if (!prepared) {
+                    doPrepare();
+                    prepared = true;
+                }
+            }
+        }
+    }
+
+    private void doPrepare() {
+        this.dataMap = new HashMap<>();
+        if (isFiltering) {
+            this.fieldGetters =
+                    IntStream.range(0, rowType.getFieldCount())
+                            .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
+                            .toArray(RowData.FieldGetter[]::new);
+
+            TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
+            for (byte[] bytes : serializedData) {
+                try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+                        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
+                    RowData partition = serializer.deserialize(inView);
+                    List<RowData> partitions =
+                            dataMap.computeIfAbsent(hash(partition), k -> new ArrayList<>());
+                    partitions.add(partition);
+                } catch (Exception e) {
+                    throw new TableException("Unable to deserialize the value.", e);
+                }
+            }
+        }
+    }
+
+    private int hash(RowData row) {
+        return Objects.hash(Arrays.stream(fieldGetters).map(g -> g.getFieldOrNull(row)).toArray());
+    }
+
+    public static boolean isEqual(DynamicFilteringData data, DynamicFilteringData another) {
+        if (data == null) {
+            return another == null;
+        }
+        if (another == null
+                || (data.isFiltering != another.isFiltering)
+                || !data.typeInfo.equals(another.typeInfo)
+                || !data.rowType.equals(another.rowType)
+                || data.serializedData.size() != another.serializedData.size()) {
+            return false;
+        }
+
+        BytePrimitiveArrayComparator comparator = new BytePrimitiveArrayComparator(true);
+        for (int i = 0; i < data.serializedData.size(); i++) {
+            if (comparator.compare(data.serializedData.get(i), another.serializedData.get(i))
+                    != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @VisibleForTesting
+    public Collection<RowData> getData() {

Review Comment:
   So far the method is not used in formal code, including the dynamic filtering implementation of hive source. I'd prefer to remain the annotation for now. We can remove it when it's actually used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #20374:
URL: https://github.com/apache/flink/pull/20374#issuecomment-1205073985

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on PR #20374:
URL: https://github.com/apache/flink/pull/20374#issuecomment-1204737427

   Hi @godfreyhe ,
   Thanks for the suggestions!
   I suppose I've resolved most of the comments, and the failover case is coming soon. Would you please take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20374: [FLINK-TBD][source] Runtime support for Dynamic filtering.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20374:
URL: https://github.com/apache/flink/pull/20374#issuecomment-1196391834

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5299234f3aa791504caee82695bf207eb5971dc0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5299234f3aa791504caee82695bf207eb5971dc0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5299234f3aa791504caee82695bf207eb5971dc0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r939518327


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+
+    public static final Set<LogicalTypeRoot> SUPPORTED_TYPES;
+
+    static {
+        Set<LogicalTypeRoot> supported = new HashSet<>();
+        supported.add(LogicalTypeRoot.INTEGER);
+        supported.add(LogicalTypeRoot.BIGINT);
+        supported.add(LogicalTypeRoot.SMALLINT);
+        supported.add(LogicalTypeRoot.TINYINT);
+        supported.add(LogicalTypeRoot.VARCHAR);
+        supported.add(LogicalTypeRoot.CHAR);
+        supported.add(LogicalTypeRoot.DATE);
+        supported.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+        supported.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+        supported.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+        SUPPORTED_TYPES = Collections.unmodifiableSet(supported);
+    }
+
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+
+    /** The list should be sorted and distinct. */
+    private final List<byte[]> serializedData;
+
+    /** Whether the data actually does filter. If false, everything is considered contained. */
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, List<RowData>> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = checkNotNull(typeInfo);
+        this.rowType = checkNotNull(rowType);
+        this.serializedData = checkNotNull(serializedData);
+        this.isFiltering = isFiltering;
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            List<RowData> mayMatchRowData = dataMap.get(hash(row));
+            if (mayMatchRowData == null) {
+                return false;
+            }
+            for (RowData mayMatch : mayMatchRowData) {
+                if (matchRow(row, mayMatch)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    private boolean matchRow(RowData row, RowData mayMatch) {
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            if (!Objects.equals(
+                    fieldGetters[i].getFieldOrNull(row),
+                    fieldGetters[i].getFieldOrNull(mayMatch))) {

Review Comment:
   In addition, here users mean the users of this data class, rather than flink users. So I suppose the behavior of the users  should be under control.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #20374:
URL: https://github.com/apache/flink/pull/20374#issuecomment-1207254955

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936467997


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
+ * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators.
+ */
+public class DynamicFilteringDataCollectorOperatorCoordinator
+        implements OperatorCoordinator, CoordinationRequestHandler {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
+
+    private final CoordinatorStore coordinatorStore;
+    private final List<String> dynamicFilteringDataListenerIDs;
+
+    private boolean hasReceivedFilteringData;
+
+    public DynamicFilteringDataCollectorOperatorCoordinator(
+            Context context, List<String> dynamicFilteringDataListenerIDs) {
+        this.coordinatorStore = checkNotNull(context.getCoordinatorStore());
+        this.dynamicFilteringDataListenerIDs = checkNotNull(dynamicFilteringDataListenerIDs);
+    }
+
+    @Override
+    public void start() throws Exception {}
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
+            throws Exception {
+        // Since there might be speculative execution, once the dynamic filter collectors operator
+        // has been executed for multiple attempts, we only keep the first notification.
+        if (hasReceivedFilteringData) {
+            return;
+        }
+
+        for (String listenerID : dynamicFilteringDataListenerIDs) {
+            // Push event to listening source coordinators.
+            OperatorCoordinator listener = (OperatorCoordinator) coordinatorStore.get(listenerID);
+            if (listener == null) {
+                LOG.error("Dynamic filtering data listener is missing: {}", listenerID);

Review Comment:
   Not about the log level, perhaps we might directly throw an exception in this case?
   
   Logically if a dpp operator is created, there must be at least one corresponding fact source. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937312617


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = checkNotNull(dynamicFilteringFieldType);
+        this.dynamicFilteringFieldIndices = checkNotNull(dynamicFilteringFieldIndices);
+        this.threshold = threshold;
+        this.operatorEventGateway = checkNotNull(operatorEventGateway);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported data type for dynamic filtering:" + type);
+            }
+        }
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+            serializer.serialize(rowData, wrapper);
+            boolean duplicated = !buffer.add(baos.toByteArray());
+            if (duplicated) {
+                return;
+            }
+
+            currentSize += baos.size();
+        }
+
+        if (exceedThreshold()) {
+            // Clear the filtering data and disable self by leaving the currentSize unchanged
+            buffer.clear();
+            LOG.warn(
+                    "Collected data size exceeds the threshold, {} > {}, dynamic filtering is disabled.",
+                    currentSize,
+                    threshold);
+        }
+    }
+
+    private boolean exceedThreshold() {
+        return threshold > 0 && currentSize > threshold;
+    }
+
+    @Override
+    public void finish() throws Exception {
+        if (exceedThreshold()) {
+            LOG.info(
+                    "Finish collecting. {} bytes are collected which exceeds the threshold {}. Sending empty data.",
+                    currentSize,
+                    threshold);
+        } else {
+            LOG.info(
+                    "Finish collecting. {} bytes in {} rows are collected. Sending the data.",
+                    currentSize,
+                    buffer.size());
+        }
+        sendEvent();

Review Comment:
   what if we do not send any event when `exceedThreshold` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937368986


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+
+    /** The list should be sorted and distinct. */
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, List<RowData>> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = checkNotNull(typeInfo);
+        this.rowType = checkNotNull(rowType);
+        this.serializedData = checkNotNull(serializedData);
+        this.isFiltering = isFiltering;
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            List<RowData> mayMatchRowData = dataMap.get(hash(row));
+            if (mayMatchRowData == null) {
+                return false;
+            }
+            for (RowData mayMatch : mayMatchRowData) {
+                if (matchRow(row, mayMatch)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    private boolean matchRow(RowData row, RowData mayMatch) {
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            if (!Objects.equals(
+                    fieldGetters[i].getFieldOrNull(row),
+                    fieldGetters[i].getFieldOrNull(mayMatch))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void prepare() {
+        if (!prepared) {
+            synchronized (this) {
+                if (!prepared) {
+                    doPrepare();
+                    prepared = true;
+                }
+            }
+        }
+    }
+
+    private void doPrepare() {
+        this.dataMap = new HashMap<>();
+        if (isFiltering) {
+            this.fieldGetters =
+                    IntStream.range(0, rowType.getFieldCount())
+                            .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
+                            .toArray(RowData.FieldGetter[]::new);
+
+            TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
+            for (byte[] bytes : serializedData) {
+                try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+                        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
+                    RowData partition = serializer.deserialize(inView);
+                    List<RowData> partitions =
+                            dataMap.computeIfAbsent(hash(partition), k -> new ArrayList<>());
+                    partitions.add(partition);
+                } catch (Exception e) {
+                    throw new TableException("Unable to deserialize the value.", e);
+                }
+            }
+        }
+    }
+
+    private int hash(RowData row) {
+        return Objects.hash(Arrays.stream(fieldGetters).map(g -> g.getFieldOrNull(row)).toArray());
+    }
+
+    public static boolean isEqual(DynamicFilteringData data, DynamicFilteringData another) {
+        if (data == null) {
+            return another == null;
+        }
+        if (another == null
+                || (data.isFiltering != another.isFiltering)
+                || !data.typeInfo.equals(another.typeInfo)
+                || !data.rowType.equals(another.rowType)
+                || data.serializedData.size() != another.serializedData.size()) {
+            return false;
+        }
+
+        BytePrimitiveArrayComparator comparator = new BytePrimitiveArrayComparator(true);
+        for (int i = 0; i < data.serializedData.size(); i++) {
+            if (comparator.compare(data.serializedData.get(i), another.serializedData.get(i))
+                    != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @VisibleForTesting
+    public Collection<RowData> getData() {

Review Comment:
   I think it should be a public interface, some lake house connectors has statistics info in partition spec, the partition can be skipped not only based on the partition value but also base on the statistics. If these connectors want to support dpp, they will not change the flink framework code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936198465


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
+ * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators.
+ */
+public class DynamicFilteringDataCollectorOperatorCoordinator
+        implements OperatorCoordinator, CoordinationRequestHandler {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
+
+    private final CoordinatorStore coordinatorStore;
+    private final List<String> dynamicFilteringDataListenerIDs;
+
+    private boolean hasReceivedFilteringData;
+
+    public DynamicFilteringDataCollectorOperatorCoordinator(
+            Context context, List<String> dynamicFilteringDataListenerIDs) {
+        this.coordinatorStore = context.getCoordinatorStore();
+        this.dynamicFilteringDataListenerIDs = dynamicFilteringDataListenerIDs;
+    }
+
+    @Override
+    public void start() throws Exception {}
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
+            throws Exception {
+        // Since there might be speculative execution, once the dynamic filter collectors operator
+        // has been executed for multiple attempts, we only keep the first notification.
+        if (hasReceivedFilteringData) {
+            return;
+        }
+
+        for (String listenerID : dynamicFilteringDataListenerIDs) {
+            // Push event to listening source coordinators.
+            OperatorCoordinator listener = (OperatorCoordinator) coordinatorStore.get(listenerID);
+            if (listener == null) {
+                LOG.warn("Dynamic filtering data listener is missing: {}", listenerID);

Review Comment:
   Maybe it's better to be an error. The exception will not make the result incorrect, but the dynamic filtering will be invalid. So it's not a critical error, but indeed an error. I'll modify the log level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936334758


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * ExecutionOrderEnforcerOperator has two inputs, one of which is a source, and the other is the
+ * dependent upstream. It enforces that the input source is executed after the dependent input is
+ * finished. Everything passed from the inputs is forwarded to the output, though typically the
+ * dependent input should not send anything.
+ *
+ * <p>The operator must be chained with the source, which is generally ensured by the {@link
+ * ExecutionOrderEnforcerOperatorFactory}. If chaining is explicitly disabled, the enforcer can not
+ * work as expected.
+ *
+ * <p>The operator is used only for dynamic filtering at present.
+ */
+public class ExecutionOrderEnforcerOperator<IN> extends AbstractStreamOperatorV2<IN>
+        implements MultipleInputStreamOperator<IN> {
+
+    public ExecutionOrderEnforcerOperator(StreamOperatorParameters<IN> parameters) {
+        super(parameters, 2);
+    }
+
+    @Override
+    public List<Input> getInputs() {
+        return Arrays.asList(new ForwardingInput<>(output), new ForwardingInput<>(output));
+    }
+
+    private static class ForwardingInput<IN> implements Input<IN> {

Review Comment:
   Although logically this operator only used in batch mode, logically the processing of watermark / watermark status / latency mark here is not right. 
   
   We might change the Input to be the subclass of `AbstractInput`, which has dealt with these event correctly. 



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
+ * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators.
+ */
+public class DynamicFilteringDataCollectorOperatorCoordinator
+        implements OperatorCoordinator, CoordinationRequestHandler {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
+
+    private final CoordinatorStore coordinatorStore;
+    private final List<String> dynamicFilteringDataListenerIDs;
+
+    private boolean hasReceivedFilteringData;
+
+    public DynamicFilteringDataCollectorOperatorCoordinator(
+            Context context, List<String> dynamicFilteringDataListenerIDs) {
+        this.coordinatorStore = checkNotNull(context.getCoordinatorStore());
+        this.dynamicFilteringDataListenerIDs = checkNotNull(dynamicFilteringDataListenerIDs);
+    }
+
+    @Override
+    public void start() throws Exception {}
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
+            throws Exception {
+        // Since there might be speculative execution, once the dynamic filter collectors operator
+        // has been executed for multiple attempts, we only keep the first notification.
+        if (hasReceivedFilteringData) {

Review Comment:
   If there are global failover, all the tasks would restarted, then here we might have some problem since the flag  would not be clear. We might make the provider of this operator coordinator extends the `RecreateOnResetOperatorCoordinator.Provider`.
   
   If the dpp task restarted, it's also possible that the filtering data get changed. In this case, we have to ensure the filtering data sent to the source coordinator and the join operators are same. However, in consideration of the speculative execution, we could not easily know which version is the chosen one. Thus for now we may need to ask user to disable dpp in this case. For the long run, we may consider restarting sources in some way once the data changed. 
   
   In summary, we may
   1. Use RecreateOnResetOperatorCoordinator to clear the state on global failover.
   2. When received repeat data, compare it to the previous one, if not the same, then throw an exception now.
   3. Create a follow-up issue to optimize this behavior (restarting sources or update partition is source is not fully finished) in the future. 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936504568


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
+ * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators.
+ */
+public class DynamicFilteringDataCollectorOperatorCoordinator
+        implements OperatorCoordinator, CoordinationRequestHandler {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
+
+    private final CoordinatorStore coordinatorStore;
+    private final List<String> dynamicFilteringDataListenerIDs;
+
+    private boolean hasReceivedFilteringData;
+
+    public DynamicFilteringDataCollectorOperatorCoordinator(
+            Context context, List<String> dynamicFilteringDataListenerIDs) {
+        this.coordinatorStore = checkNotNull(context.getCoordinatorStore());
+        this.dynamicFilteringDataListenerIDs = checkNotNull(dynamicFilteringDataListenerIDs);
+    }
+
+    @Override
+    public void start() throws Exception {}
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
+            throws Exception {
+        // Since there might be speculative execution, once the dynamic filter collectors operator
+        // has been executed for multiple attempts, we only keep the first notification.
+        if (hasReceivedFilteringData) {

Review Comment:
   Failover handling can be indeed complex for this coordinator and the receiver(in the next PR). I agree that we are ready to handle it gracefully. I'll follow the suggestion as it should be the safest way for now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937383257


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -26,30 +26,49 @@
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.ByteArrayInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Data for dynamic filtering. */
 public class DynamicFilteringData implements Serializable {
+
+    public static final Set<LogicalTypeRoot> SUPPORTED_TYPES;
+
+    static {
+        Set<LogicalTypeRoot> supported = new HashSet<>();
+        supported.add(LogicalTypeRoot.INTEGER);
+        supported.add(LogicalTypeRoot.BIGINT);
+        supported.add(LogicalTypeRoot.SMALLINT);
+        supported.add(LogicalTypeRoot.TINYINT);
+        supported.add(LogicalTypeRoot.VARCHAR);
+        supported.add(LogicalTypeRoot.CHAR);
+        SUPPORTED_TYPES = Collections.unmodifiableSet(supported);

Review Comment:
   DATE, TIME_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, TIMESTAMP_WITHOUT_TIME_ZONE are added. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937427051


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -26,30 +26,49 @@
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.ByteArrayInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Data for dynamic filtering. */
 public class DynamicFilteringData implements Serializable {
+
+    public static final Set<LogicalTypeRoot> SUPPORTED_TYPES;
+
+    static {
+        Set<LogicalTypeRoot> supported = new HashSet<>();
+        supported.add(LogicalTypeRoot.INTEGER);
+        supported.add(LogicalTypeRoot.BIGINT);
+        supported.add(LogicalTypeRoot.SMALLINT);
+        supported.add(LogicalTypeRoot.TINYINT);
+        supported.add(LogicalTypeRoot.VARCHAR);
+        supported.add(LogicalTypeRoot.CHAR);
+        SUPPORTED_TYPES = Collections.unmodifiableSet(supported);

Review Comment:
   Seems that we can't support TIMESTAMP_WITH_TIME_ZONE at present. I'm removing it from the supported types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936212347


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, RowData> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = typeInfo;
+        this.rowType = rowType;
+        this.isFiltering = isFiltering;
+        this.serializedData = isFiltering ? serializedData : Collections.emptyList();
+    }
+
+    public boolean isFiltering() {

Review Comment:
   It's also necessary in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936197852


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+    private transient boolean eventSent;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = dynamicFilteringFieldType;
+        this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices;
+        this.threshold = threshold;
+        this.operatorEventGateway = operatorEventGateway;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+        this.eventSent = false;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        serializer.serialize(rowData, wrapper);
+        boolean duplicated = !buffer.add(baos.toByteArray());
+        if (duplicated) {
+            return;
+        }
+        currentSize += baos.size();
+
+        if (exceedThreshold()) {
+            // Send an empty filtering data and disable self by leaving the currentSize unchanged
+            sendEvent();

Review Comment:
   An empty event takes no effect to the source. It seems that we don't need to eagerly send the event. Though it's still better to clear the buffer eagerly. I'll remove the sending here, as well as the flag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936194502


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, RowData> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = typeInfo;
+        this.rowType = rowType;
+        this.isFiltering = isFiltering;
+        this.serializedData = isFiltering ? serializedData : Collections.emptyList();
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public Collection<RowData> getData() {
+        prepare();
+        return dataMap.values();
+    }
+
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            RowData rowData = dataMap.get(hash(row));

Review Comment:
   Here seems to be a bug. I'll fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r939518251


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+
+    public static final Set<LogicalTypeRoot> SUPPORTED_TYPES;
+
+    static {
+        Set<LogicalTypeRoot> supported = new HashSet<>();
+        supported.add(LogicalTypeRoot.INTEGER);
+        supported.add(LogicalTypeRoot.BIGINT);
+        supported.add(LogicalTypeRoot.SMALLINT);
+        supported.add(LogicalTypeRoot.TINYINT);
+        supported.add(LogicalTypeRoot.VARCHAR);
+        supported.add(LogicalTypeRoot.CHAR);
+        supported.add(LogicalTypeRoot.DATE);
+        supported.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+        supported.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+        supported.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+        SUPPORTED_TYPES = Collections.unmodifiableSet(supported);
+    }
+
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+
+    /** The list should be sorted and distinct. */
+    private final List<byte[]> serializedData;
+
+    /** Whether the data actually does filter. If false, everything is considered contained. */
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, List<RowData>> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = checkNotNull(typeInfo);
+        this.rowType = checkNotNull(rowType);
+        this.serializedData = checkNotNull(serializedData);
+        this.isFiltering = isFiltering;
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            List<RowData> mayMatchRowData = dataMap.get(hash(row));
+            if (mayMatchRowData == null) {
+                return false;
+            }
+            for (RowData mayMatch : mayMatchRowData) {
+                if (matchRow(row, mayMatch)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    private boolean matchRow(RowData row, RowData mayMatch) {
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            if (!Objects.equals(
+                    fieldGetters[i].getFieldOrNull(row),
+                    fieldGetters[i].getFieldOrNull(mayMatch))) {

Review Comment:
   Values in the building rows and testing rows are all expected to be Flink internal types, which should be guaranteed by the builders and users. I suppose that's not the responsibility of the DynamicFilteringData. But maybe I can add some comments to explain this to notify all users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #20374:
URL: https://github.com/apache/flink/pull/20374#issuecomment-1204703556

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937324901


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = checkNotNull(dynamicFilteringFieldType);
+        this.dynamicFilteringFieldIndices = checkNotNull(dynamicFilteringFieldIndices);
+        this.threshold = threshold;
+        this.operatorEventGateway = checkNotNull(operatorEventGateway);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:

Review Comment:
   I suppose we can add a static field descrbing all the supported types as a Set.
   Besides, since the setters of GenericRowData don't care the type, I suppose we can create FieldGetters for getting value from the input rowType, and directly set to the GenericRowData we create here in spite of the type. The code can be much cleaner and we can support more types easily. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937365940


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -26,30 +26,49 @@
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.ByteArrayInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Data for dynamic filtering. */
 public class DynamicFilteringData implements Serializable {
+
+    public static final Set<LogicalTypeRoot> SUPPORTED_TYPES;
+
+    static {
+        Set<LogicalTypeRoot> supported = new HashSet<>();
+        supported.add(LogicalTypeRoot.INTEGER);
+        supported.add(LogicalTypeRoot.BIGINT);
+        supported.add(LogicalTypeRoot.SMALLINT);
+        supported.add(LogicalTypeRoot.TINYINT);
+        supported.add(LogicalTypeRoot.VARCHAR);
+        supported.add(LogicalTypeRoot.CHAR);
+        SUPPORTED_TYPES = Collections.unmodifiableSet(supported);

Review Comment:
   I think date/time/timestamp can be supported



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936494305


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
+ * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators.
+ */
+public class DynamicFilteringDataCollectorOperatorCoordinator
+        implements OperatorCoordinator, CoordinationRequestHandler {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
+
+    private final CoordinatorStore coordinatorStore;
+    private final List<String> dynamicFilteringDataListenerIDs;
+
+    private boolean hasReceivedFilteringData;
+
+    public DynamicFilteringDataCollectorOperatorCoordinator(
+            Context context, List<String> dynamicFilteringDataListenerIDs) {
+        this.coordinatorStore = checkNotNull(context.getCoordinatorStore());
+        this.dynamicFilteringDataListenerIDs = checkNotNull(dynamicFilteringDataListenerIDs);
+    }
+
+    @Override
+    public void start() throws Exception {}
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
+            throws Exception {
+        // Since there might be speculative execution, once the dynamic filter collectors operator
+        // has been executed for multiple attempts, we only keep the first notification.
+        if (hasReceivedFilteringData) {
+            return;
+        }
+
+        for (String listenerID : dynamicFilteringDataListenerIDs) {
+            // Push event to listening source coordinators.
+            OperatorCoordinator listener = (OperatorCoordinator) coordinatorStore.get(listenerID);
+            if (listener == null) {
+                LOG.error("Dynamic filtering data listener is missing: {}", listenerID);

Review Comment:
   Yes, it can't more than a bug indeed. I'll make it throw exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936526464


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java:
##########
@@ -67,17 +69,32 @@ public void close() throws Exception {}
     @Override
     public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
             throws Exception {
-        // Since there might be speculative execution, once the dynamic filter collectors operator
-        // has been executed for multiple attempts, we only keep the first notification.
-        if (hasReceivedFilteringData) {
-            return;
+        DynamicFilteringData currentData =
+                ((DynamicFilteringEvent) ((SourceEventWrapper) event).getSourceEvent()).getData();
+        if (receivedFilteringData == null) {
+            receivedFilteringData = currentData;
+        } else {
+            // Since there might be speculative execution or failover, we may receive multiple
+            // notifications, and we can't tell for sure which one is valid for further processing.
+            if (receivedFilteringData.equals(currentData)) {
+                // If the notifications contain exactly the same data, everything is alright, and
+                // we don't need to send the event again.
+                return;
+            } else {
+                // In case the mismatching of the source filtering result and the dim data, which
+                // may leads to incorrect result, trigger global failover for fully recomputing.
+                throw new IllegalStateException(

Review Comment:
   Might tip users to disable dpp in this case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe closed pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #20374: [FLINK-28709] Implement dynamic filtering operators
URL: https://github.com/apache/flink/pull/20374


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on PR #20374:
URL: https://github.com/apache/flink/pull/20374#issuecomment-1207181809

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #20374:
URL: https://github.com/apache/flink/pull/20374#issuecomment-1207299221

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on PR #20374:
URL: https://github.com/apache/flink/pull/20374#issuecomment-1203772817

   Hi @gaoyunhaii ,
   Thanks for the reviewing! All comments so far should have been addressed and resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936196209


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+    private transient boolean eventSent;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = dynamicFilteringFieldType;
+        this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices;
+        this.threshold = threshold;
+        this.operatorEventGateway = operatorEventGateway;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+        this.eventSent = false;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+        serializer.serialize(rowData, wrapper);
+        boolean duplicated = !buffer.add(baos.toByteArray());

Review Comment:
   buffer is a TreeSet with a byte[] comparator, so different byte[] instances with same content will be deduplicated as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r936194342


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+    private final List<byte[]> serializedData;
+
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, RowData> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = typeInfo;
+        this.rowType = rowType;
+        this.isFiltering = isFiltering;
+        this.serializedData = isFiltering ? serializedData : Collections.emptyList();
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {

Review Comment:
   Yes, this is necessary for building a row for comparing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937326667


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = checkNotNull(dynamicFilteringFieldType);
+        this.dynamicFilteringFieldIndices = checkNotNull(dynamicFilteringFieldIndices);
+        this.threshold = threshold;
+        this.operatorEventGateway = checkNotNull(operatorEventGateway);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported data type for dynamic filtering:" + type);
+            }
+        }
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+            serializer.serialize(rowData, wrapper);
+            boolean duplicated = !buffer.add(baos.toByteArray());
+            if (duplicated) {
+                return;
+            }
+
+            currentSize += baos.size();
+        }
+
+        if (exceedThreshold()) {
+            // Clear the filtering data and disable self by leaving the currentSize unchanged
+            buffer.clear();
+            LOG.warn(
+                    "Collected data size exceeds the threshold, {} > {}, dynamic filtering is disabled.",
+                    currentSize,
+                    threshold);
+        }
+    }
+
+    private boolean exceedThreshold() {
+        return threshold > 0 && currentSize > threshold;
+    }
+
+    @Override
+    public void finish() throws Exception {
+        if (exceedThreshold()) {
+            LOG.info(
+                    "Finish collecting. {} bytes are collected which exceeds the threshold {}. Sending empty data.",
+                    currentSize,
+                    threshold);
+        } else {
+            LOG.info(
+                    "Finish collecting. {} bytes in {} rows are collected. Sending the data.",
+                    currentSize,
+                    buffer.size());
+        }
+        sendEvent();

Review Comment:
   In case different attempts of the collector calculate different results, one of which is exceeding threshold, I think it's better to send the event even it's exceeding threshold, or the coordinator may not know the problem and can't trigger failover for recomputing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #20374: [FLINK-28709] Implement dynamic filtering operators

Posted by GitBox <gi...@apache.org>.
pltbkd commented on code in PR #20374:
URL: https://github.com/apache/flink/pull/20374#discussion_r937324901


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = checkNotNull(dynamicFilteringFieldType);
+        this.dynamicFilteringFieldIndices = checkNotNull(dynamicFilteringFieldIndices);
+        this.threshold = threshold;
+        this.operatorEventGateway = checkNotNull(operatorEventGateway);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            LogicalType type = dynamicFilteringFieldType.getTypeAt(i);
+            int index = dynamicFilteringFieldIndices.get(i);
+            switch (type.getTypeRoot()) {
+                case INTEGER:
+                    rowData.setField(i, value.getInt(index));
+                    break;
+                case BIGINT:
+                    rowData.setField(i, value.getLong(index));
+                    break;
+                case VARCHAR:
+                    rowData.setField(i, value.getString(index));
+                    break;
+                default:

Review Comment:
   I suppose we can add a static method giving all the supported types as a Set for this usage. maybe 
   Set<LogicalTypeRoot> getSupportedTypes()
   Besides, since the setters of GenericRowData don't care the type, I suppose we can create FieldGetters for getting value from the input rowType, and directly set to the GenericRowData we create here in spite of the type. The code can be much cleaner and we can support more types easily. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org