You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/07 02:33:50 UTC
[flink] 02/03: [FLINK-28709][table] Introduce DynamicFilteringData and the DynamicFilteringDataCollectorOperator to build and distribute the data
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3a8e71e286d1d097fe6cfc8b7ff2125a5108f334
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Thu Jul 28 18:27:02 2022 +0800
[FLINK-28709][table] Introduce DynamicFilteringData and the DynamicFilteringDataCollectorOperator to build and distribute the data
This closes #20374
---
.../connector/source/DynamicFilteringData.java | 196 +++++++++++++++++++++
.../connector/source/DynamicFilteringEvent.java | 45 +++++
.../DynamicFilteringDataCollectorOperator.java | 178 +++++++++++++++++++
...cFilteringDataCollectorOperatorCoordinator.java | 154 ++++++++++++++++
...namicFilteringDataCollectorOperatorFactory.java | 100 +++++++++++
...teringDataCollectorOperatorCoordinatorTest.java | 113 ++++++++++++
.../DynamicFilteringDataCollectorOperatorTest.java | 123 +++++++++++++
.../dynamicfiltering/DynamicFilteringDataTest.java | 138 +++++++++++++++
8 files changed, 1047 insertions(+)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
new file mode 100644
index 00000000000..0a81bffdd4c
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+
+ private final TypeInformation<RowData> typeInfo;
+ private final RowType rowType;
+
+ /**
+ * Serialized rows for filtering. The types of the row values must be Flink internal data type,
+ * i.e. type returned by the FieldGetter. The list should be sorted and distinct.
+ */
+ private final List<byte[]> serializedData;
+
+ /** Whether the data actually does filter. If false, everything is considered contained. */
+ private final boolean isFiltering;
+
+ private transient volatile boolean prepared = false;
+ private transient Map<Integer, List<RowData>> dataMap;
+ private transient RowData.FieldGetter[] fieldGetters;
+
+ public DynamicFilteringData(
+ TypeInformation<RowData> typeInfo,
+ RowType rowType,
+ List<byte[]> serializedData,
+ boolean isFiltering) {
+ this.typeInfo = checkNotNull(typeInfo);
+ this.rowType = checkNotNull(rowType);
+ this.serializedData = checkNotNull(serializedData);
+ this.isFiltering = isFiltering;
+ }
+
+ public boolean isFiltering() {
+ return isFiltering;
+ }
+
+ public RowType getRowType() {
+ return rowType;
+ }
+
+ /**
+ * Returns true if the dynamic filtering data contains the specific row.
+ *
+ * @param row the row to be tested. Types of the row values must be Flink internal data type,
+ * i.e. type returned by the FieldGetter.
+ * @return true if the dynamic filtering data contains the specific row
+ */
+ public boolean contains(RowData row) {
+ if (!isFiltering) {
+ return true;
+ } else if (row.getArity() != rowType.getFieldCount()) {
+ throw new TableException("The arity of RowData is different");
+ } else {
+ prepare();
+ List<RowData> mayMatchRowData = dataMap.get(hash(row));
+ if (mayMatchRowData == null) {
+ return false;
+ }
+ for (RowData mayMatch : mayMatchRowData) {
+ if (matchRow(row, mayMatch)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ private boolean matchRow(RowData row, RowData mayMatch) {
+ for (int i = 0; i < rowType.getFieldCount(); ++i) {
+ if (!Objects.equals(
+ fieldGetters[i].getFieldOrNull(row),
+ fieldGetters[i].getFieldOrNull(mayMatch))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void prepare() {
+ if (!prepared) {
+ synchronized (this) {
+ if (!prepared) {
+ doPrepare();
+ prepared = true;
+ }
+ }
+ }
+ }
+
+ private void doPrepare() {
+ this.dataMap = new HashMap<>();
+ if (isFiltering) {
+ this.fieldGetters =
+ IntStream.range(0, rowType.getFieldCount())
+ .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
+ .toArray(RowData.FieldGetter[]::new);
+
+ TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
+ for (byte[] bytes : serializedData) {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
+ RowData partition = serializer.deserialize(inView);
+ List<RowData> partitions =
+ dataMap.computeIfAbsent(hash(partition), k -> new ArrayList<>());
+ partitions.add(partition);
+ } catch (Exception e) {
+ throw new TableException("Unable to deserialize the value.", e);
+ }
+ }
+ }
+ }
+
+ private int hash(RowData row) {
+ return Objects.hash(Arrays.stream(fieldGetters).map(g -> g.getFieldOrNull(row)).toArray());
+ }
+
+ public static boolean isEqual(DynamicFilteringData data, DynamicFilteringData another) {
+ if (data == null) {
+ return another == null;
+ }
+ if (another == null
+ || (data.isFiltering != another.isFiltering)
+ || !data.typeInfo.equals(another.typeInfo)
+ || !data.rowType.equals(another.rowType)
+ || data.serializedData.size() != another.serializedData.size()) {
+ return false;
+ }
+
+ BytePrimitiveArrayComparator comparator = new BytePrimitiveArrayComparator(true);
+ for (int i = 0; i < data.serializedData.size(); i++) {
+ if (comparator.compare(data.serializedData.get(i), another.serializedData.get(i))
+ != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ public Collection<RowData> getData() {
+ prepare();
+ return dataMap.values().stream().flatMap(List::stream).collect(Collectors.toList());
+ }
+
+ @Override
+ public String toString() {
+ return "DynamicFilteringData{"
+ + "isFiltering="
+ + isFiltering
+ + ", data size="
+ + serializedData.size()
+ + '}';
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java
new file mode 100644
index 00000000000..dcc826e661d
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A source event to transport the {@link DynamicFilteringData} for dynamic filtering purpose. The
+ * event is sent by the DynamicFilteringDataCollector to the enumerator of a source that supports
+ * dynamic filtering, via DynamicFilteringDataCollectorCoordinator and SourceCoordinator.
+ */
+public class DynamicFilteringEvent implements SourceEvent {
+ private final DynamicFilteringData data;
+
+ public DynamicFilteringEvent(DynamicFilteringData data) {
+ this.data = checkNotNull(data);
+ }
+
+ public DynamicFilteringData getData() {
+ return data;
+ }
+
+ @Override
+ public String toString() {
+ return "DynamicFilteringEvent{" + "data=" + data + '}';
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java
new file mode 100644
index 00000000000..1d42dd08f7f
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+ implements OneInputStreamOperator<RowData, Object> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+ private final RowType dynamicFilteringFieldType;
+ private final List<Integer> dynamicFilteringFieldIndices;
+ private final long threshold;
+ private final OperatorEventGateway operatorEventGateway;
+
+ private transient TypeInformation<RowData> typeInfo;
+ private transient TypeSerializer<RowData> serializer;
+
+ private transient Set<byte[]> buffer;
+ private transient long currentSize;
+ private transient FieldGetter[] fieldGetters;
+
+ public DynamicFilteringDataCollectorOperator(
+ RowType dynamicFilteringFieldType,
+ List<Integer> dynamicFilteringFieldIndices,
+ long threshold,
+ OperatorEventGateway operatorEventGateway) {
+ this.dynamicFilteringFieldType = checkNotNull(dynamicFilteringFieldType);
+ this.dynamicFilteringFieldIndices = checkNotNull(dynamicFilteringFieldIndices);
+ this.threshold = threshold;
+ this.operatorEventGateway = checkNotNull(operatorEventGateway);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+ this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+ this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+ this.currentSize = 0L;
+ this.fieldGetters =
+ IntStream.range(0, dynamicFilteringFieldIndices.size())
+ .mapToObj(
+ i ->
+ RowData.createFieldGetter(
+ dynamicFilteringFieldType.getTypeAt(i),
+ dynamicFilteringFieldIndices.get(i)))
+ .toArray(FieldGetter[]::new);
+ }
+
+ @Override
+ public void processElement(StreamRecord<RowData> element) throws Exception {
+ if (exceedThreshold()) {
+ return;
+ }
+
+ RowData value = element.getValue();
+ GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+ for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+ rowData.setField(i, fieldGetters[i].getFieldOrNull(value));
+ }
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+ serializer.serialize(rowData, wrapper);
+ boolean duplicated = !buffer.add(baos.toByteArray());
+ if (duplicated) {
+ return;
+ }
+
+ currentSize += baos.size();
+ }
+
+ if (exceedThreshold()) {
+ // Clear the filtering data and disable self by leaving the currentSize unchanged
+ buffer.clear();
+ LOG.warn(
+ "Collected data size exceeds the threshold, {} > {}, dynamic filtering is disabled.",
+ currentSize,
+ threshold);
+ }
+ }
+
+ private boolean exceedThreshold() {
+ return threshold > 0 && currentSize > threshold;
+ }
+
+ @Override
+ public void finish() throws Exception {
+ if (exceedThreshold()) {
+ LOG.info(
+ "Finish collecting. {} bytes are collected which exceeds the threshold {}. Sending empty data.",
+ currentSize,
+ threshold);
+ } else {
+ LOG.info(
+ "Finish collecting. {} bytes in {} rows are collected. Sending the data.",
+ currentSize,
+ buffer.size());
+ }
+ sendEvent();
+ }
+
+ private void sendEvent() {
+ final DynamicFilteringData dynamicFilteringData;
+ if (exceedThreshold()) {
+ dynamicFilteringData =
+ new DynamicFilteringData(
+ typeInfo, dynamicFilteringFieldType, Collections.emptyList(), false);
+ } else {
+ dynamicFilteringData =
+ new DynamicFilteringData(
+ typeInfo, dynamicFilteringFieldType, new ArrayList<>(buffer), true);
+ }
+
+ DynamicFilteringEvent event = new DynamicFilteringEvent(dynamicFilteringData);
+ operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (buffer != null) {
+ buffer.clear();
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
new file mode 100644
index 00000000000..50b10768765
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
+ * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators.
+ */
+public class DynamicFilteringDataCollectorOperatorCoordinator
+ implements OperatorCoordinator, CoordinationRequestHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
+
+ private final CoordinatorStore coordinatorStore;
+ private final List<String> dynamicFilteringDataListenerIDs;
+
+ private DynamicFilteringData receivedFilteringData;
+
+ public DynamicFilteringDataCollectorOperatorCoordinator(
+ Context context, List<String> dynamicFilteringDataListenerIDs) {
+ this.coordinatorStore = checkNotNull(context.getCoordinatorStore());
+ this.dynamicFilteringDataListenerIDs = checkNotNull(dynamicFilteringDataListenerIDs);
+ }
+
+ @Override
+ public void start() throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
+ throws Exception {
+ DynamicFilteringData currentData =
+ ((DynamicFilteringEvent) ((SourceEventWrapper) event).getSourceEvent()).getData();
+ if (receivedFilteringData == null) {
+ receivedFilteringData = currentData;
+ } else {
+ // Since there might be speculative execution or failover, we may receive multiple
+ // notifications, and we can't tell for sure which one is valid for further processing.
+ if (DynamicFilteringData.isEqual(receivedFilteringData, currentData)) {
+ // If the notifications contain exactly the same data, everything is alright, and
+ // we don't need to send the event again.
+ return;
+ } else {
+ // In case the mismatching of the source filtering result and the dim data, which
+ // may leads to incorrect result, trigger global failover for fully recomputing.
+ throw new IllegalStateException(
+ "DynamicFilteringData is recomputed but not equal. "
+ + "Triggering global failover in case the result is incorrect. "
+ + " It's recommended to re-run the job with dynamic filtering disabled.");
+ }
+ }
+
+ for (String listenerID : dynamicFilteringDataListenerIDs) {
+ // Push event to listening source coordinators.
+ OperatorCoordinator listener = (OperatorCoordinator) coordinatorStore.get(listenerID);
+ if (listener == null) {
+ throw new IllegalStateException(
+ "Dynamic filtering data listener is missing: " + listenerID);
+ } else {
+ LOG.info(
+ "Distributing event {} to source coordinator with ID {}",
+ event,
+ listenerID);
+ // Subtask index and attempt number is not necessary for handling
+ // DynamicFilteringEvent.
+ listener.handleEventFromOperator(0, 0, event);
+ }
+ }
+ }
+
+ @Override
+ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
+ CoordinationRequest request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void subtaskReset(int subtask, long checkpointId) {}
+
+ @Override
+ public void executionAttemptFailed(
+ int subtask, int attemptNumber, @Nullable Throwable reason) {}
+
+ @Override
+ public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {}
+
+ @Override
+ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
+ throws Exception {}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {}
+
+ @Override
+ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData)
+ throws Exception {}
+
+ /** Provider for {@link DynamicFilteringDataCollectorOperatorCoordinator}. */
+ public static class Provider extends RecreateOnResetOperatorCoordinator.Provider {
+ private final List<String> dynamicFilteringDataListenerIDs;
+
+ public Provider(OperatorID operatorID, List<String> dynamicFilteringDataListenerIDs) {
+ super(operatorID);
+ this.dynamicFilteringDataListenerIDs = checkNotNull(dynamicFilteringDataListenerIDs);
+ }
+
+ @Override
+ protected OperatorCoordinator getCoordinator(Context context) throws Exception {
+ return new DynamicFilteringDataCollectorOperatorCoordinator(
+ context, dynamicFilteringDataListenerIDs);
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java
new file mode 100644
index 00000000000..e9ad35e9c9b
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The factory class for {@link DynamicFilteringDataCollectorOperator}. */
+public class DynamicFilteringDataCollectorOperatorFactory
+ extends AbstractStreamOperatorFactory<Object>
+ implements CoordinatedOperatorFactory<Object> {
+
+ private final Set<String> dynamicFilteringDataListenerIDs = new HashSet<>();
+ private final RowType dynamicFilteringFieldType;
+ private final List<Integer> dynamicFilteringFieldIndices;
+ private final long threshold;
+
+ public DynamicFilteringDataCollectorOperatorFactory(
+ RowType dynamicFilteringFieldType,
+ List<Integer> dynamicFilteringFieldIndices,
+ long threshold) {
+ this.dynamicFilteringFieldType = checkNotNull(dynamicFilteringFieldType);
+ this.dynamicFilteringFieldIndices = checkNotNull(dynamicFilteringFieldIndices);
+ this.threshold = threshold;
+ }
+
+ @Override
+ public <T extends StreamOperator<Object>> T createStreamOperator(
+ StreamOperatorParameters<Object> parameters) {
+ final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+ final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
+ OperatorEventGateway operatorEventGateway =
+ eventDispatcher.getOperatorEventGateway(operatorId);
+
+ DynamicFilteringDataCollectorOperator operator =
+ new DynamicFilteringDataCollectorOperator(
+ dynamicFilteringFieldType,
+ dynamicFilteringFieldIndices,
+ threshold,
+ operatorEventGateway);
+
+ operator.setup(
+ parameters.getContainingTask(),
+ parameters.getStreamConfig(),
+ parameters.getOutput());
+
+ // today's lunch is generics spaghetti
+ @SuppressWarnings("unchecked")
+ final T castedOperator = (T) operator;
+
+ return castedOperator;
+ }
+
+ public void registerDynamicFilteringDataListenerID(String id) {
+ this.dynamicFilteringDataListenerIDs.add(checkNotNull(id));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+ return DynamicFilteringDataCollectorOperator.class;
+ }
+
+ @Override
+ public OperatorCoordinator.Provider getCoordinatorProvider(
+ String operatorName, OperatorID operatorID) {
+ return new DynamicFilteringDataCollectorOperatorCoordinator.Provider(
+ operatorID, new ArrayList<>(dynamicFilteringDataListenerIDs));
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinatorTest.java
new file mode 100644
index 00000000000..86b5638bac9
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinatorTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link DynamicFilteringDataCollectorOperatorCoordinator}. */
+class DynamicFilteringDataCollectorOperatorCoordinatorTest {
+
+ @Test
+ void testRedistributeData() throws Exception {
+ MockOperatorCoordinatorContext context =
+ new MockOperatorCoordinatorContext(new OperatorID(), 1);
+ String listenerID1 = "test-listener-1";
+ String listenerID2 = "test-listener-2";
+
+ TestingOperatorCoordinator listener1 = new TestingOperatorCoordinator(context);
+ TestingOperatorCoordinator listener2 = new TestingOperatorCoordinator(context);
+ context.getCoordinatorStore().putIfAbsent(listenerID1, listener1);
+ context.getCoordinatorStore().putIfAbsent(listenerID2, listener2);
+
+ RowType rowType = RowType.of(new IntType());
+ OperatorEvent testEvent = dynamicFilteringEvent(rowType, Collections.emptyList());
+ try (DynamicFilteringDataCollectorOperatorCoordinator coordinator =
+ new DynamicFilteringDataCollectorOperatorCoordinator(
+ context, Arrays.asList(listenerID1, listenerID2))) {
+ coordinator.handleEventFromOperator(0, 1, testEvent);
+ }
+
+ assertThat(listener1.getNextReceivedOperatorEvent()).isSameAs(testEvent);
+ assertThat(listener1.getNextReceivedOperatorEvent()).isNull();
+ assertThat(listener2.getNextReceivedOperatorEvent()).isSameAs(testEvent);
+ assertThat(listener2.getNextReceivedOperatorEvent()).isNull();
+ }
+
+ @Test
+ void testTaskFailover() throws Exception {
+ MockOperatorCoordinatorContext context =
+ new MockOperatorCoordinatorContext(new OperatorID(), 1);
+ String listenerID = "test-listener-1";
+
+ TestingOperatorCoordinator listener = new TestingOperatorCoordinator(context);
+ context.getCoordinatorStore().putIfAbsent(listenerID, listener);
+
+ RowType rowType = RowType.of(new IntType());
+ try (DynamicFilteringDataCollectorOperatorCoordinator coordinator =
+ new DynamicFilteringDataCollectorOperatorCoordinator(
+ context, Arrays.asList(listenerID))) {
+ OperatorEvent testEvent =
+ dynamicFilteringEvent(rowType, Collections.singletonList(new byte[] {1, 2}));
+ coordinator.handleEventFromOperator(0, 0, testEvent);
+ assertThat(listener.getNextReceivedOperatorEvent()).isSameAs(testEvent);
+
+ // failover happens
+ coordinator.executionAttemptFailed(0, 0, null);
+
+ OperatorEvent testEvent1 =
+ dynamicFilteringEvent(rowType, Collections.singletonList(new byte[] {1, 2}));
+ coordinator.handleEventFromOperator(0, 1, testEvent1);
+ // testEvent1 contains the same data as testEvent, nothing happens and don't send event
+ assertThat(listener.getNextReceivedOperatorEvent()).isNull();
+
+ // failover happens again
+ coordinator.executionAttemptFailed(0, 1, null);
+
+ OperatorEvent testEvent2 =
+ dynamicFilteringEvent(rowType, Collections.singletonList(new byte[] {1, 3}));
+ assertThatThrownBy(() -> coordinator.handleEventFromOperator(0, 2, testEvent2))
+ .isInstanceOf(IllegalStateException.class);
+ }
+ }
+
+ private OperatorEvent dynamicFilteringEvent(RowType rowType, List<byte[]> data) {
+ return new SourceEventWrapper(
+ new DynamicFilteringEvent(
+ new DynamicFilteringData(
+ InternalTypeInfo.of(rowType), rowType, data, data.isEmpty())));
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java
new file mode 100644
index 00000000000..0fc6a9b8b97
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DynamicFilteringDataCollectorOperator}. */
+class DynamicFilteringDataCollectorOperatorTest {
+
+ @Test
+ void testCollectDynamicFilteringData() throws Exception {
+ RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+ List<Integer> indexes = Arrays.asList(0, 1, 3);
+ MockOperatorEventGateway gateway = new MockOperatorEventGateway();
+
+ DynamicFilteringDataCollectorOperator operator =
+ new DynamicFilteringDataCollectorOperator(rowType, indexes, -1, gateway);
+ ConcurrentLinkedQueue<Object> output;
+ try (OneInputStreamOperatorTestHarness<RowData, Object> harness =
+ new OneInputStreamOperatorTestHarness<>(operator)) {
+ output = harness.getOutput();
+ harness.setup();
+ harness.open();
+
+ for (long i = 0L; i < 3L; i++) {
+ harness.processElement(rowData(1, 1L, 0, "a"), i);
+ }
+ harness.processElement(rowData(2, 1L, 0, null), 3L);
+
+ // operator.finish is called when closing
+ }
+
+ assertThat(output).isEmpty();
+
+ assertThat(gateway.getEventsSent()).hasSize(1);
+ OperatorEvent event = gateway.getEventsSent().get(0);
+ assertThat(event).isInstanceOf(SourceEventWrapper.class);
+ SourceEvent dynamicFilteringEvent = ((SourceEventWrapper) event).getSourceEvent();
+ assertThat(dynamicFilteringEvent).isInstanceOf(DynamicFilteringEvent.class);
+
+ DynamicFilteringData data = ((DynamicFilteringEvent) dynamicFilteringEvent).getData();
+ assertThat(data.isFiltering()).isTrue();
+ assertThat(data.getData()).hasSize(2);
+ assertThat(data.contains(rowData(1, 1L, "a"))).isTrue();
+ assertThat(data.contains(rowData(2, 1L, null))).isTrue();
+ }
+
+ @Test
+ void testExceedsThreshold() throws Exception {
+ RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+ List<Integer> indexes = Arrays.asList(0, 1, 3);
+ MockOperatorEventGateway gateway = new MockOperatorEventGateway();
+ // Can hold at most 2 rows
+ int thresholds = 100;
+
+ DynamicFilteringDataCollectorOperator operator =
+ new DynamicFilteringDataCollectorOperator(rowType, indexes, thresholds, gateway);
+ try (OneInputStreamOperatorTestHarness<RowData, Object> harness =
+ new OneInputStreamOperatorTestHarness<>(operator)) {
+ harness.setup();
+ harness.open();
+
+ harness.processElement(rowData(1, 1L, 0, "a"), 1L);
+ harness.processElement(rowData(2, 1L, 0, "b"), 2L);
+ harness.processElement(rowData(3, 1L, 0, "c"), 3L);
+ }
+
+ assertThat(gateway.getEventsSent()).hasSize(1);
+ OperatorEvent event = gateway.getEventsSent().get(0);
+ assertThat(event).isInstanceOf(SourceEventWrapper.class);
+ SourceEvent dynamicFilteringEvent = ((SourceEventWrapper) event).getSourceEvent();
+ assertThat(dynamicFilteringEvent).isInstanceOf(DynamicFilteringEvent.class);
+ DynamicFilteringData data = ((DynamicFilteringEvent) dynamicFilteringEvent).getData();
+ assertThat(data.isFiltering()).isFalse();
+ }
+
+ private RowData rowData(Object... values) {
+ GenericRowData rowData = new GenericRowData(values.length);
+ for (int i = 0; i < values.length; ++i) {
+ Object value = values[i];
+ value = value instanceof String ? new BinaryStringData((String) value) : value;
+ rowData.setField(i, value);
+ }
+ return rowData;
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataTest.java
new file mode 100644
index 00000000000..7ecb43b89fb
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link DynamicFilteringData}. The tests are here since it's hard to test
+ * DynamicFilteringData without the support of flink-table-runtime.
+ */
+class DynamicFilteringDataTest {
+ @Test
+ void testContains() {
+ RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+ TypeInformation<RowData> rowTypeInfo = InternalTypeInfo.of(rowType);
+ List<RowData> buildRows = new ArrayList<>();
+ buildRows.add(rowData(1, 1L, "a"));
+ buildRows.add(rowData(2, 1L, null));
+ buildRows.add(rowData(1, null, "b"));
+ buildRows.add(rowData(null, 2L, "c"));
+ buildRows.add(rowData(0, 31L, "d"));
+ List<byte[]> serializedData =
+ buildRows.stream().map(r -> serialize(rowTypeInfo, r)).collect(Collectors.toList());
+
+ DynamicFilteringData data =
+ new DynamicFilteringData(rowTypeInfo, rowType, serializedData, true);
+
+ for (RowData r : buildRows) {
+ assertThat(data.contains(r)).isTrue();
+ }
+ assertThat(data.contains(rowData(0, 1L, "a"))).isFalse();
+ assertThat(data.contains(rowData(1, 1L, null))).isFalse();
+ // Has the same hash as (0, 31L, "d")
+ assertThat(data.contains(rowData(1, 0L, "d"))).isFalse();
+ }
+
+ @Test
+ void testNotFiltering() {
+ RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+ DynamicFilteringData data =
+ new DynamicFilteringData(
+ InternalTypeInfo.of(rowType), rowType, Collections.emptyList(), false);
+ assertThat(data.contains(rowData(1, 1L, "a"))).isTrue();
+ }
+
+ @Test
+ void testAddHashConflictingData() {
+ RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+ TypeInformation<RowData> rowTypeInfo = InternalTypeInfo.of(rowType);
+ List<RowData> buildRows = new ArrayList<>();
+ buildRows.add(rowData(0, 31L, "d"));
+ // Has the same hash as (0, 31L, "d")
+ buildRows.add(rowData(1, 0L, "d"));
+
+ List<byte[]> serializedData =
+ buildRows.stream().map(r -> serialize(rowTypeInfo, r)).collect(Collectors.toList());
+
+ DynamicFilteringData data =
+ new DynamicFilteringData(rowTypeInfo, rowType, serializedData, true);
+
+ for (RowData r : buildRows) {
+ assertThat(data.contains(r)).isTrue();
+ }
+ }
+
+ @Test
+ void testMismatchingRowDataArity() {
+ RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+ DynamicFilteringData data =
+ new DynamicFilteringData(
+ InternalTypeInfo.of(rowType), rowType, Collections.emptyList(), true);
+ assertThatThrownBy(() -> data.contains(rowData(1, 1L)))
+ .isInstanceOf(TableException.class)
+ .hasMessage("The arity of RowData is different");
+ }
+
+ private RowData rowData(Object... values) {
+ GenericRowData rowData = new GenericRowData(values.length);
+ for (int i = 0; i < values.length; ++i) {
+ Object value = values[i];
+ value = value instanceof String ? new BinaryStringData((String) value) : value;
+ rowData.setField(i, value);
+ }
+ return rowData;
+ }
+
+ private byte[] serialize(TypeInformation<RowData> typeInfo, RowData row) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ typeInfo.createSerializer(new ExecutionConfig())
+ .serialize(row, new DataOutputViewStreamWrapper(baos));
+ } catch (IOException e) {
+ // throw as RuntimeException so the function can use in lambda
+ throw new RuntimeException(e);
+ }
+ return baos.toByteArray();
+ }
+}