You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/07 02:33:50 UTC

[flink] 02/03: [FLINK-28709][table] Introduce DynamicFilteringData and the DynamicFilteringDataCollectorOperator to build and distribute the data

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3a8e71e286d1d097fe6cfc8b7ff2125a5108f334
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Thu Jul 28 18:27:02 2022 +0800

    [FLINK-28709][table] Introduce DynamicFilteringData and the DynamicFilteringDataCollectorOperator to build and distribute the data
    
    This closes #20374
---
 .../connector/source/DynamicFilteringData.java     | 196 +++++++++++++++++++++
 .../connector/source/DynamicFilteringEvent.java    |  45 +++++
 .../DynamicFilteringDataCollectorOperator.java     | 178 +++++++++++++++++++
 ...cFilteringDataCollectorOperatorCoordinator.java | 154 ++++++++++++++++
 ...namicFilteringDataCollectorOperatorFactory.java | 100 +++++++++++
 ...teringDataCollectorOperatorCoordinatorTest.java | 113 ++++++++++++
 .../DynamicFilteringDataCollectorOperatorTest.java | 123 +++++++++++++
 .../dynamicfiltering/DynamicFilteringDataTest.java | 138 +++++++++++++++
 8 files changed, 1047 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
new file mode 100644
index 00000000000..0a81bffdd4c
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Data for dynamic filtering. */
+public class DynamicFilteringData implements Serializable {
+
+    private final TypeInformation<RowData> typeInfo;
+    private final RowType rowType;
+
+    /**
+     * Serialized rows for filtering. The types of the row values must be Flink internal data type,
+     * i.e. type returned by the FieldGetter. The list should be sorted and distinct.
+     */
+    private final List<byte[]> serializedData;
+
+    /** Whether the data actually does filter. If false, everything is considered contained. */
+    private final boolean isFiltering;
+
+    private transient volatile boolean prepared = false;
+    private transient Map<Integer, List<RowData>> dataMap;
+    private transient RowData.FieldGetter[] fieldGetters;
+
+    public DynamicFilteringData(
+            TypeInformation<RowData> typeInfo,
+            RowType rowType,
+            List<byte[]> serializedData,
+            boolean isFiltering) {
+        this.typeInfo = checkNotNull(typeInfo);
+        this.rowType = checkNotNull(rowType);
+        this.serializedData = checkNotNull(serializedData);
+        this.isFiltering = isFiltering;
+    }
+
+    public boolean isFiltering() {
+        return isFiltering;
+    }
+
+    public RowType getRowType() {
+        return rowType;
+    }
+
+    /**
+     * Returns true if the dynamic filtering data contains the specific row.
+     *
+     * @param row the row to be tested. Types of the row values must be Flink internal data type,
+     *     i.e. type returned by the FieldGetter.
+     * @return true if the dynamic filtering data contains the specific row
+     */
+    public boolean contains(RowData row) {
+        if (!isFiltering) {
+            return true;
+        } else if (row.getArity() != rowType.getFieldCount()) {
+            throw new TableException("The arity of RowData is different");
+        } else {
+            prepare();
+            List<RowData> mayMatchRowData = dataMap.get(hash(row));
+            if (mayMatchRowData == null) {
+                return false;
+            }
+            for (RowData mayMatch : mayMatchRowData) {
+                if (matchRow(row, mayMatch)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    private boolean matchRow(RowData row, RowData mayMatch) {
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            if (!Objects.equals(
+                    fieldGetters[i].getFieldOrNull(row),
+                    fieldGetters[i].getFieldOrNull(mayMatch))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void prepare() {
+        if (!prepared) {
+            synchronized (this) {
+                if (!prepared) {
+                    doPrepare();
+                    prepared = true;
+                }
+            }
+        }
+    }
+
+    private void doPrepare() {
+        this.dataMap = new HashMap<>();
+        if (isFiltering) {
+            this.fieldGetters =
+                    IntStream.range(0, rowType.getFieldCount())
+                            .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
+                            .toArray(RowData.FieldGetter[]::new);
+
+            TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
+            for (byte[] bytes : serializedData) {
+                try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+                        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
+                    RowData partition = serializer.deserialize(inView);
+                    List<RowData> partitions =
+                            dataMap.computeIfAbsent(hash(partition), k -> new ArrayList<>());
+                    partitions.add(partition);
+                } catch (Exception e) {
+                    throw new TableException("Unable to deserialize the value.", e);
+                }
+            }
+        }
+    }
+
+    private int hash(RowData row) {
+        return Objects.hash(Arrays.stream(fieldGetters).map(g -> g.getFieldOrNull(row)).toArray());
+    }
+
+    public static boolean isEqual(DynamicFilteringData data, DynamicFilteringData another) {
+        if (data == null) {
+            return another == null;
+        }
+        if (another == null
+                || (data.isFiltering != another.isFiltering)
+                || !data.typeInfo.equals(another.typeInfo)
+                || !data.rowType.equals(another.rowType)
+                || data.serializedData.size() != another.serializedData.size()) {
+            return false;
+        }
+
+        BytePrimitiveArrayComparator comparator = new BytePrimitiveArrayComparator(true);
+        for (int i = 0; i < data.serializedData.size(); i++) {
+            if (comparator.compare(data.serializedData.get(i), another.serializedData.get(i))
+                    != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @VisibleForTesting
+    public Collection<RowData> getData() {
+        prepare();
+        return dataMap.values().stream().flatMap(List::stream).collect(Collectors.toList());
+    }
+
+    @Override
+    public String toString() {
+        return "DynamicFilteringData{"
+                + "isFiltering="
+                + isFiltering
+                + ", data size="
+                + serializedData.size()
+                + '}';
+    }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java
new file mode 100644
index 00000000000..dcc826e661d
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A source event to transport the {@link DynamicFilteringData} for dynamic filtering purpose. The
+ * event is sent by the DynamicFilteringDataCollector to the enumerator of a source that supports
+ * dynamic filtering, via DynamicFilteringDataCollectorCoordinator and SourceCoordinator.
+ */
+public class DynamicFilteringEvent implements SourceEvent {
+    private final DynamicFilteringData data;
+
+    public DynamicFilteringEvent(DynamicFilteringData data) {
+        this.data = checkNotNull(data);
+    }
+
+    public DynamicFilteringData getData() {
+        return data;
+    }
+
+    @Override
+    public String toString() {
+        return "DynamicFilteringEvent{" + "data=" + data + '}';
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java
new file mode 100644
index 00000000000..1d42dd08f7f
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic
+ * filtering.
+ */
+public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
+        implements OneInputStreamOperator<RowData, Object> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
+
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient TypeInformation<RowData> typeInfo;
+    private transient TypeSerializer<RowData> serializer;
+
+    private transient Set<byte[]> buffer;
+    private transient long currentSize;
+    private transient FieldGetter[] fieldGetters;
+
+    public DynamicFilteringDataCollectorOperator(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold,
+            OperatorEventGateway operatorEventGateway) {
+        this.dynamicFilteringFieldType = checkNotNull(dynamicFilteringFieldType);
+        this.dynamicFilteringFieldIndices = checkNotNull(dynamicFilteringFieldIndices);
+        this.threshold = threshold;
+        this.operatorEventGateway = checkNotNull(operatorEventGateway);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType);
+        this.serializer = typeInfo.createSerializer(new ExecutionConfig());
+        this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare);
+        this.currentSize = 0L;
+        this.fieldGetters =
+                IntStream.range(0, dynamicFilteringFieldIndices.size())
+                        .mapToObj(
+                                i ->
+                                        RowData.createFieldGetter(
+                                                dynamicFilteringFieldType.getTypeAt(i),
+                                                dynamicFilteringFieldIndices.get(i)))
+                        .toArray(FieldGetter[]::new);
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        if (exceedThreshold()) {
+            return;
+        }
+
+        RowData value = element.getValue();
+        GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
+        for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) {
+            rowData.setField(i, fieldGetters[i].getFieldOrNull(value));
+        }
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
+            serializer.serialize(rowData, wrapper);
+            boolean duplicated = !buffer.add(baos.toByteArray());
+            if (duplicated) {
+                return;
+            }
+
+            currentSize += baos.size();
+        }
+
+        if (exceedThreshold()) {
+            // Clear the filtering data and disable self by leaving the currentSize unchanged
+            buffer.clear();
+            LOG.warn(
+                    "Collected data size exceeds the threshold, {} > {}, dynamic filtering is disabled.",
+                    currentSize,
+                    threshold);
+        }
+    }
+
+    private boolean exceedThreshold() {
+        return threshold > 0 && currentSize > threshold;
+    }
+
+    @Override
+    public void finish() throws Exception {
+        if (exceedThreshold()) {
+            LOG.info(
+                    "Finish collecting. {} bytes are collected which exceeds the threshold {}. Sending empty data.",
+                    currentSize,
+                    threshold);
+        } else {
+            LOG.info(
+                    "Finish collecting. {} bytes in {} rows are collected. Sending the data.",
+                    currentSize,
+                    buffer.size());
+        }
+        sendEvent();
+    }
+
+    private void sendEvent() {
+        final DynamicFilteringData dynamicFilteringData;
+        if (exceedThreshold()) {
+            dynamicFilteringData =
+                    new DynamicFilteringData(
+                            typeInfo, dynamicFilteringFieldType, Collections.emptyList(), false);
+        } else {
+            dynamicFilteringData =
+                    new DynamicFilteringData(
+                            typeInfo, dynamicFilteringFieldType, new ArrayList<>(buffer), true);
+        }
+
+        DynamicFilteringEvent event = new DynamicFilteringEvent(dynamicFilteringData);
+        operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (buffer != null) {
+            buffer.clear();
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
new file mode 100644
index 00000000000..50b10768765
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator
+ * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators.
+ */
+public class DynamicFilteringDataCollectorOperatorCoordinator
+        implements OperatorCoordinator, CoordinationRequestHandler {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class);
+
+    private final CoordinatorStore coordinatorStore;
+    private final List<String> dynamicFilteringDataListenerIDs;
+
+    private DynamicFilteringData receivedFilteringData;
+
+    public DynamicFilteringDataCollectorOperatorCoordinator(
+            Context context, List<String> dynamicFilteringDataListenerIDs) {
+        this.coordinatorStore = checkNotNull(context.getCoordinatorStore());
+        this.dynamicFilteringDataListenerIDs = checkNotNull(dynamicFilteringDataListenerIDs);
+    }
+
+    @Override
+    public void start() throws Exception {}
+
+    @Override
+    public void close() throws Exception {}
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
+            throws Exception {
+        DynamicFilteringData currentData =
+                ((DynamicFilteringEvent) ((SourceEventWrapper) event).getSourceEvent()).getData();
+        if (receivedFilteringData == null) {
+            receivedFilteringData = currentData;
+        } else {
+            // Since there might be speculative execution or failover, we may receive multiple
+            // notifications, and we can't tell for sure which one is valid for further processing.
+            if (DynamicFilteringData.isEqual(receivedFilteringData, currentData)) {
+                // If the notifications contain exactly the same data, everything is alright, and
+                // we don't need to send the event again.
+                return;
+            } else {
+                // In case the mismatching of the source filtering result and the dim data, which
+                // may leads to incorrect result, trigger global failover for fully recomputing.
+                throw new IllegalStateException(
+                        "DynamicFilteringData is recomputed but not equal. "
+                                + "Triggering global failover in case the result is incorrect. "
+                                + " It's recommended to re-run the job with dynamic filtering disabled.");
+            }
+        }
+
+        for (String listenerID : dynamicFilteringDataListenerIDs) {
+            // Push event to listening source coordinators.
+            OperatorCoordinator listener = (OperatorCoordinator) coordinatorStore.get(listenerID);
+            if (listener == null) {
+                throw new IllegalStateException(
+                        "Dynamic filtering data listener is missing: " + listenerID);
+            } else {
+                LOG.info(
+                        "Distributing event {} to source coordinator with ID {}",
+                        event,
+                        listenerID);
+                // Subtask index and attempt number is not necessary for handling
+                // DynamicFilteringEvent.
+                listener.handleEventFromOperator(0, 0, event);
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
+            CoordinationRequest request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void subtaskReset(int subtask, long checkpointId) {}
+
+    @Override
+    public void executionAttemptFailed(
+            int subtask, int attemptNumber, @Nullable Throwable reason) {}
+
+    @Override
+    public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {}
+
+    @Override
+    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result)
+            throws Exception {}
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {}
+
+    @Override
+    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData)
+            throws Exception {}
+
+    /** Provider for {@link DynamicFilteringDataCollectorOperatorCoordinator}. */
+    public static class Provider extends RecreateOnResetOperatorCoordinator.Provider {
+        private final List<String> dynamicFilteringDataListenerIDs;
+
+        public Provider(OperatorID operatorID, List<String> dynamicFilteringDataListenerIDs) {
+            super(operatorID);
+            this.dynamicFilteringDataListenerIDs = checkNotNull(dynamicFilteringDataListenerIDs);
+        }
+
+        @Override
+        protected OperatorCoordinator getCoordinator(Context context) throws Exception {
+            return new DynamicFilteringDataCollectorOperatorCoordinator(
+                    context, dynamicFilteringDataListenerIDs);
+        }
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java
new file mode 100644
index 00000000000..e9ad35e9c9b
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The factory class for {@link DynamicFilteringDataCollectorOperator}. */
+public class DynamicFilteringDataCollectorOperatorFactory
+        extends AbstractStreamOperatorFactory<Object>
+        implements CoordinatedOperatorFactory<Object> {
+
+    private final Set<String> dynamicFilteringDataListenerIDs = new HashSet<>();
+    private final RowType dynamicFilteringFieldType;
+    private final List<Integer> dynamicFilteringFieldIndices;
+    private final long threshold;
+
+    public DynamicFilteringDataCollectorOperatorFactory(
+            RowType dynamicFilteringFieldType,
+            List<Integer> dynamicFilteringFieldIndices,
+            long threshold) {
+        this.dynamicFilteringFieldType = checkNotNull(dynamicFilteringFieldType);
+        this.dynamicFilteringFieldIndices = checkNotNull(dynamicFilteringFieldIndices);
+        this.threshold = threshold;
+    }
+
+    @Override
+    public <T extends StreamOperator<Object>> T createStreamOperator(
+            StreamOperatorParameters<Object> parameters) {
+        final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+        final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
+        OperatorEventGateway operatorEventGateway =
+                eventDispatcher.getOperatorEventGateway(operatorId);
+
+        DynamicFilteringDataCollectorOperator operator =
+                new DynamicFilteringDataCollectorOperator(
+                        dynamicFilteringFieldType,
+                        dynamicFilteringFieldIndices,
+                        threshold,
+                        operatorEventGateway);
+
+        operator.setup(
+                parameters.getContainingTask(),
+                parameters.getStreamConfig(),
+                parameters.getOutput());
+
+        // today's lunch is generics spaghetti
+        @SuppressWarnings("unchecked")
+        final T castedOperator = (T) operator;
+
+        return castedOperator;
+    }
+
+    public void registerDynamicFilteringDataListenerID(String id) {
+        this.dynamicFilteringDataListenerIDs.add(checkNotNull(id));
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+        return DynamicFilteringDataCollectorOperator.class;
+    }
+
+    @Override
+    public OperatorCoordinator.Provider getCoordinatorProvider(
+            String operatorName, OperatorID operatorID) {
+        return new DynamicFilteringDataCollectorOperatorCoordinator.Provider(
+                operatorID, new ArrayList<>(dynamicFilteringDataListenerIDs));
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinatorTest.java
new file mode 100644
index 00000000000..86b5638bac9
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinatorTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link DynamicFilteringDataCollectorOperatorCoordinator}. */
+class DynamicFilteringDataCollectorOperatorCoordinatorTest {
+
+    @Test
+    void testRedistributeData() throws Exception {
+        MockOperatorCoordinatorContext context =
+                new MockOperatorCoordinatorContext(new OperatorID(), 1);
+        String listenerID1 = "test-listener-1";
+        String listenerID2 = "test-listener-2";
+
+        TestingOperatorCoordinator listener1 = new TestingOperatorCoordinator(context);
+        TestingOperatorCoordinator listener2 = new TestingOperatorCoordinator(context);
+        context.getCoordinatorStore().putIfAbsent(listenerID1, listener1);
+        context.getCoordinatorStore().putIfAbsent(listenerID2, listener2);
+
+        RowType rowType = RowType.of(new IntType());
+        OperatorEvent testEvent = dynamicFilteringEvent(rowType, Collections.emptyList());
+        try (DynamicFilteringDataCollectorOperatorCoordinator coordinator =
+                new DynamicFilteringDataCollectorOperatorCoordinator(
+                        context, Arrays.asList(listenerID1, listenerID2))) {
+            coordinator.handleEventFromOperator(0, 1, testEvent);
+        }
+
+        assertThat(listener1.getNextReceivedOperatorEvent()).isSameAs(testEvent);
+        assertThat(listener1.getNextReceivedOperatorEvent()).isNull();
+        assertThat(listener2.getNextReceivedOperatorEvent()).isSameAs(testEvent);
+        assertThat(listener2.getNextReceivedOperatorEvent()).isNull();
+    }
+
+    @Test
+    void testTaskFailover() throws Exception {
+        MockOperatorCoordinatorContext context =
+                new MockOperatorCoordinatorContext(new OperatorID(), 1);
+        String listenerID = "test-listener-1";
+
+        TestingOperatorCoordinator listener = new TestingOperatorCoordinator(context);
+        context.getCoordinatorStore().putIfAbsent(listenerID, listener);
+
+        RowType rowType = RowType.of(new IntType());
+        try (DynamicFilteringDataCollectorOperatorCoordinator coordinator =
+                new DynamicFilteringDataCollectorOperatorCoordinator(
+                        context, Arrays.asList(listenerID))) {
+            OperatorEvent testEvent =
+                    dynamicFilteringEvent(rowType, Collections.singletonList(new byte[] {1, 2}));
+            coordinator.handleEventFromOperator(0, 0, testEvent);
+            assertThat(listener.getNextReceivedOperatorEvent()).isSameAs(testEvent);
+
+            // failover happens
+            coordinator.executionAttemptFailed(0, 0, null);
+
+            OperatorEvent testEvent1 =
+                    dynamicFilteringEvent(rowType, Collections.singletonList(new byte[] {1, 2}));
+            coordinator.handleEventFromOperator(0, 1, testEvent1);
+            // testEvent1 contains the same data as testEvent, nothing happens and don't send event
+            assertThat(listener.getNextReceivedOperatorEvent()).isNull();
+
+            // failover happens again
+            coordinator.executionAttemptFailed(0, 1, null);
+
+            OperatorEvent testEvent2 =
+                    dynamicFilteringEvent(rowType, Collections.singletonList(new byte[] {1, 3}));
+            assertThatThrownBy(() -> coordinator.handleEventFromOperator(0, 2, testEvent2))
+                    .isInstanceOf(IllegalStateException.class);
+        }
+    }
+
+    private OperatorEvent dynamicFilteringEvent(RowType rowType, List<byte[]> data) {
+        return new SourceEventWrapper(
+                new DynamicFilteringEvent(
+                        new DynamicFilteringData(
+                                InternalTypeInfo.of(rowType), rowType, data, data.isEmpty())));
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java
new file mode 100644
index 00000000000..0fc6a9b8b97
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DynamicFilteringDataCollectorOperator}. */
+class DynamicFilteringDataCollectorOperatorTest {
+
+    @Test
+    void testCollectDynamicFilteringData() throws Exception {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        List<Integer> indexes = Arrays.asList(0, 1, 3);
+        MockOperatorEventGateway gateway = new MockOperatorEventGateway();
+
+        DynamicFilteringDataCollectorOperator operator =
+                new DynamicFilteringDataCollectorOperator(rowType, indexes, -1, gateway);
+        ConcurrentLinkedQueue<Object> output;
+        try (OneInputStreamOperatorTestHarness<RowData, Object> harness =
+                new OneInputStreamOperatorTestHarness<>(operator)) {
+            output = harness.getOutput();
+            harness.setup();
+            harness.open();
+
+            for (long i = 0L; i < 3L; i++) {
+                harness.processElement(rowData(1, 1L, 0, "a"), i);
+            }
+            harness.processElement(rowData(2, 1L, 0, null), 3L);
+
+            // operator.finish is called when closing
+        }
+
+        assertThat(output).isEmpty();
+
+        assertThat(gateway.getEventsSent()).hasSize(1);
+        OperatorEvent event = gateway.getEventsSent().get(0);
+        assertThat(event).isInstanceOf(SourceEventWrapper.class);
+        SourceEvent dynamicFilteringEvent = ((SourceEventWrapper) event).getSourceEvent();
+        assertThat(dynamicFilteringEvent).isInstanceOf(DynamicFilteringEvent.class);
+
+        DynamicFilteringData data = ((DynamicFilteringEvent) dynamicFilteringEvent).getData();
+        assertThat(data.isFiltering()).isTrue();
+        assertThat(data.getData()).hasSize(2);
+        assertThat(data.contains(rowData(1, 1L, "a"))).isTrue();
+        assertThat(data.contains(rowData(2, 1L, null))).isTrue();
+    }
+
+    @Test
+    void testExceedsThreshold() throws Exception {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        List<Integer> indexes = Arrays.asList(0, 1, 3);
+        MockOperatorEventGateway gateway = new MockOperatorEventGateway();
+        // Can hold at most 2 rows
+        int thresholds = 100;
+
+        DynamicFilteringDataCollectorOperator operator =
+                new DynamicFilteringDataCollectorOperator(rowType, indexes, thresholds, gateway);
+        try (OneInputStreamOperatorTestHarness<RowData, Object> harness =
+                new OneInputStreamOperatorTestHarness<>(operator)) {
+            harness.setup();
+            harness.open();
+
+            harness.processElement(rowData(1, 1L, 0, "a"), 1L);
+            harness.processElement(rowData(2, 1L, 0, "b"), 2L);
+            harness.processElement(rowData(3, 1L, 0, "c"), 3L);
+        }
+
+        assertThat(gateway.getEventsSent()).hasSize(1);
+        OperatorEvent event = gateway.getEventsSent().get(0);
+        assertThat(event).isInstanceOf(SourceEventWrapper.class);
+        SourceEvent dynamicFilteringEvent = ((SourceEventWrapper) event).getSourceEvent();
+        assertThat(dynamicFilteringEvent).isInstanceOf(DynamicFilteringEvent.class);
+        DynamicFilteringData data = ((DynamicFilteringEvent) dynamicFilteringEvent).getData();
+        assertThat(data.isFiltering()).isFalse();
+    }
+
+    private RowData rowData(Object... values) {
+        GenericRowData rowData = new GenericRowData(values.length);
+        for (int i = 0; i < values.length; ++i) {
+            Object value = values[i];
+            value = value instanceof String ? new BinaryStringData((String) value) : value;
+            rowData.setField(i, value);
+        }
+        return rowData;
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataTest.java
new file mode 100644
index 00000000000..7ecb43b89fb
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.dynamicfiltering;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link DynamicFilteringData}. The tests are here since it's hard to test
+ * DynamicFilteringData without the support of flink-table-runtime.
+ */
+class DynamicFilteringDataTest {
+    @Test
+    void testContains() {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        TypeInformation<RowData> rowTypeInfo = InternalTypeInfo.of(rowType);
+        List<RowData> buildRows = new ArrayList<>();
+        buildRows.add(rowData(1, 1L, "a"));
+        buildRows.add(rowData(2, 1L, null));
+        buildRows.add(rowData(1, null, "b"));
+        buildRows.add(rowData(null, 2L, "c"));
+        buildRows.add(rowData(0, 31L, "d"));
+        List<byte[]> serializedData =
+                buildRows.stream().map(r -> serialize(rowTypeInfo, r)).collect(Collectors.toList());
+
+        DynamicFilteringData data =
+                new DynamicFilteringData(rowTypeInfo, rowType, serializedData, true);
+
+        for (RowData r : buildRows) {
+            assertThat(data.contains(r)).isTrue();
+        }
+        assertThat(data.contains(rowData(0, 1L, "a"))).isFalse();
+        assertThat(data.contains(rowData(1, 1L, null))).isFalse();
+        // Has the same hash as (0, 31L, "d")
+        assertThat(data.contains(rowData(1, 0L, "d"))).isFalse();
+    }
+
+    @Test
+    void testNotFiltering() {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        DynamicFilteringData data =
+                new DynamicFilteringData(
+                        InternalTypeInfo.of(rowType), rowType, Collections.emptyList(), false);
+        assertThat(data.contains(rowData(1, 1L, "a"))).isTrue();
+    }
+
+    @Test
+    void testAddHashConflictingData() {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        TypeInformation<RowData> rowTypeInfo = InternalTypeInfo.of(rowType);
+        List<RowData> buildRows = new ArrayList<>();
+        buildRows.add(rowData(0, 31L, "d"));
+        // Has the same hash as (0, 31L, "d")
+        buildRows.add(rowData(1, 0L, "d"));
+
+        List<byte[]> serializedData =
+                buildRows.stream().map(r -> serialize(rowTypeInfo, r)).collect(Collectors.toList());
+
+        DynamicFilteringData data =
+                new DynamicFilteringData(rowTypeInfo, rowType, serializedData, true);
+
+        for (RowData r : buildRows) {
+            assertThat(data.contains(r)).isTrue();
+        }
+    }
+
+    @Test
+    void testMismatchingRowDataArity() {
+        RowType rowType = RowType.of(new IntType(), new BigIntType(), new VarCharType());
+        DynamicFilteringData data =
+                new DynamicFilteringData(
+                        InternalTypeInfo.of(rowType), rowType, Collections.emptyList(), true);
+        assertThatThrownBy(() -> data.contains(rowData(1, 1L)))
+                .isInstanceOf(TableException.class)
+                .hasMessage("The arity of RowData is different");
+    }
+
+    private RowData rowData(Object... values) {
+        GenericRowData rowData = new GenericRowData(values.length);
+        for (int i = 0; i < values.length; ++i) {
+            Object value = values[i];
+            value = value instanceof String ? new BinaryStringData((String) value) : value;
+            rowData.setField(i, value);
+        }
+        return rowData;
+    }
+
+    private byte[] serialize(TypeInformation<RowData> typeInfo, RowData row) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+            typeInfo.createSerializer(new ExecutionConfig())
+                    .serialize(row, new DataOutputViewStreamWrapper(baos));
+        } catch (IOException e) {
+            // throw as RuntimeException so the function can use in lambda
+            throw new RuntimeException(e);
+        }
+        return baos.toByteArray();
+    }
+}