You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/07 13:52:11 UTC
[flink] 02/02: [FLINK-28710][table-planner] Supports dynamic filtering execution
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8011f139dfa86ad9abf764994a572295337fdc5b
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Aug 5 11:53:02 2022 +0800
[FLINK-28710][table-planner] Supports dynamic filtering execution
This closes #20472
---
.../table/planner/plan/nodes/exec/ExecEdge.java | 8 +
.../planner/plan/nodes/exec/ExecNodeBase.java | 4 +
.../BatchExecDynamicFilteringDataCollector.java | 107 ++++
.../nodes/exec/batch/BatchExecTableSourceScan.java | 82 ++-
.../exec/common/CommonExecTableSourceScan.java | 6 +-
.../DynamicFilteringDependencyProcessor.java | 87 +++
.../processor/ResetTransformationProcessor.java | 46 ++
.../utils/InputPriorityConflictResolver.java | 24 +-
.../exec/stream/StreamExecTableSourceScan.java | 11 +-
.../table/planner/delegation/BatchPlanner.scala | 11 +-
.../table/planner/delegation/PlannerBase.scala | 10 +
.../table/planner/delegation/StreamPlanner.scala | 6 +-
...atchPhysicalDynamicFilteringDataCollector.scala | 13 +-
...chPhysicalDynamicFilteringTableSourceScan.scala | 3 +-
.../source/DynamicFilteringValuesSource.java | 129 ++++
.../source/DynamicFilteringValuesSourceReader.java | 185 ++++++
.../DynamicFilteringValuesSourceEnumerator.java | 149 +++++
.../source/split/ValuesSourcePartitionSplit.java | 48 ++
.../ValuesSourcePartitionSplitSerializer.java | 76 +++
.../planner/factories/TestValuesTableFactory.java | 101 +--
.../plan/batch/sql/DynamicFilteringTest.java | 162 +++++
.../utils/InputPriorityConflictResolverTest.java | 54 ++
.../runtime/batch/sql/DynamicFilteringITCase.java | 268 ++++++++
.../plan/batch/sql/DynamicFilteringTest.xml | 713 +++++++++++++++++++++
.../nodes/exec/operator/BatchOperatorNameTest.xml | 232 +++----
.../table/planner/runtime/utils/TestData.scala | 16 +
26 files changed, 2377 insertions(+), 174 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
index ca2559e4559..4a6ee683495 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
@@ -113,6 +113,14 @@ public class ExecEdge {
private Shuffle shuffle = FORWARD_SHUFFLE;
private StreamExchangeMode exchangeMode = StreamExchangeMode.PIPELINED;
+ public Builder from(ExecEdge original) {
+ this.source = original.source;
+ this.target = original.target;
+ this.shuffle = original.shuffle;
+ this.exchangeMode = original.exchangeMode;
+ return this;
+ }
+
public Builder source(ExecNode<?> source) {
this.source = source;
return this;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
index 06b88507b3f..843abc2d992 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
@@ -245,4 +245,8 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> {
}
return detailName;
}
+
+ public void resetTransformation() {
+ this.transformation = null;
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
new file mode 100644
index 00000000000..f12423074ee
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.dynamicfiltering.DynamicFilteringDataCollectorOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Batch {@link ExecNode} that collects inputs and builds {@link
+ * org.apache.flink.table.connector.source.DynamicFilteringData}, and then sends the {@link
+ * org.apache.flink.table.connector.source.DynamicFilteringEvent} to the source coordinator.
+ */
+public class BatchExecDynamicFilteringDataCollector extends ExecNodeBase<Object>
+ implements BatchExecNode<Object> {
+
+ @Experimental
+ private static final ConfigOption<MemorySize> TABLE_EXEC_DYNAMIC_FILTERING_THRESHOLD =
+ key("table.exec.dynamic-filtering.threshold")
+ .memoryType()
+ .defaultValue(MemorySize.parse("8 mb"))
+ .withDescription(
+ "If the collector collects more data than the threshold (default is 8M), "
+ + "an empty DynamicFilterEvent with a flag only will be sent to Coordinator, "
+ + "which could avoid exceeding the akka limit and out-of-memory (see "
+ + AkkaOptions.FRAMESIZE.key()
+ + "). Otherwise a DynamicFilterEvent with all deduplicated records will be sent to Coordinator.");
+
+ private final List<Integer> dynamicFilteringFieldIndices;
+
+ public BatchExecDynamicFilteringDataCollector(
+ List<Integer> dynamicFilteringFieldIndices,
+ ReadableConfig tableConfig,
+ InputProperty inputProperty,
+ RowType outputType,
+ String description) {
+ super(
+ ExecNodeContext.newNodeId(),
+ ExecNodeContext.newContext(BatchExecTableSourceScan.class),
+ ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
+ Collections.singletonList(inputProperty),
+ outputType,
+ description);
+ this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices;
+ checkArgument(outputType.getFieldCount() == dynamicFilteringFieldIndices.size());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Transformation<Object> translateToPlanInternal(
+ PlannerBase planner, ExecNodeConfig config) {
+ final ExecEdge inputEdge = getInputEdges().get(0);
+ final Transformation<RowData> inputTransform =
+ (Transformation<RowData>) inputEdge.translateToPlan(planner);
+ StreamOperatorFactory<Object> factory =
+ new DynamicFilteringDataCollectorOperatorFactory(
+ (RowType) getOutputType(),
+ dynamicFilteringFieldIndices,
+ config.get(TABLE_EXEC_DYNAMIC_FILTERING_THRESHOLD).getBytes());
+
+ return ExecNodeUtil.createOneInputTransformation(
+ inputTransform,
+ createTransformationName(config),
+ createTransformationDescription(config),
+ factory,
+ InternalTypeInfo.of(getOutputType()),
+ 1); // parallelism should always be 1
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
index 95e98273fb9..f0fd611cb04 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
@@ -23,17 +23,28 @@ import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
+import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.dynamicfiltering.DynamicFilteringDataCollectorOperatorFactory;
+import org.apache.flink.table.runtime.operators.dynamicfiltering.ExecutionOrderEnforcerOperatorFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.UUID;
/**
* Batch {@link ExecNode} to read data from an external source defined by a bounded {@link
@@ -42,9 +53,17 @@ import org.apache.flink.table.types.logical.RowType;
public class BatchExecTableSourceScan extends CommonExecTableSourceScan
implements BatchExecNode<RowData> {
+ // Avoids creating different ids if translated multiple times
+ private final String dynamicFilteringDataListenerID = UUID.randomUUID().toString();
+
+ private boolean needDynamicFilteringDependency;
+
+ // This constructor can be used only when table source scan has
+ // BatchExecDynamicFilteringDataCollector input
public BatchExecTableSourceScan(
ReadableConfig tableConfig,
DynamicTableSourceSpec tableSourceSpec,
+ InputProperty inputProperty,
RowType outputType,
String description) {
super(
@@ -52,10 +71,30 @@ public class BatchExecTableSourceScan extends CommonExecTableSourceScan
ExecNodeContext.newContext(BatchExecTableSourceScan.class),
ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
tableSourceSpec,
+ Collections.singletonList(inputProperty),
outputType,
description);
}
+ public BatchExecTableSourceScan(
+ ReadableConfig tableConfig,
+ DynamicTableSourceSpec tableSourceSpec,
+ RowType outputType,
+ String description) {
+ super(
+ ExecNodeContext.newNodeId(),
+ ExecNodeContext.newContext(BatchExecTableSourceScan.class),
+ ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
+ tableSourceSpec,
+ Collections.emptyList(),
+ outputType,
+ description);
+ }
+
+ public void setNeedDynamicFilteringDependency(boolean needDynamicFilteringDependency) {
+ this.needDynamicFilteringDependency = needDynamicFilteringDependency;
+ }
+
@Override
protected Transformation<RowData> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
@@ -64,7 +103,48 @@ public class BatchExecTableSourceScan extends CommonExecTableSourceScan
// the boundedness has been checked via the runtime provider already, so we can safely
// declare all legacy transformations as bounded to make the stream graph generator happy
ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation);
- return transformation;
+
+ // no dynamic filtering applied
+ if (getInputEdges().isEmpty() || !(transformation instanceof SourceTransformation)) {
+ return transformation;
+ }
+
+ // handle dynamic filtering
+ Preconditions.checkState(getInputEdges().size() == 1);
+ BatchExecNode<?> input = (BatchExecNode<?>) getInputEdges().get(0).getSource();
+ if (!(input instanceof BatchExecDynamicFilteringDataCollector)) {
+ throw new TableException(
+ "The source input must be BatchExecDynamicFilteringDataCollector for now");
+ }
+ BatchExecDynamicFilteringDataCollector dynamicFilteringDataCollector =
+ (BatchExecDynamicFilteringDataCollector) input;
+
+ ((SourceTransformation<?, ?, ?>) transformation)
+ .setCoordinatorListeningID(dynamicFilteringDataListenerID);
+
+ // Must use translateToPlan to avoid duplication dynamic filters.
+ Transformation<Object> dynamicFilteringTransform =
+ dynamicFilteringDataCollector.translateToPlan(planner);
+ ((DynamicFilteringDataCollectorOperatorFactory)
+ ((OneInputTransformation<?, ?>) dynamicFilteringTransform)
+ .getOperatorFactory())
+ .registerDynamicFilteringDataListenerID(dynamicFilteringDataListenerID);
+
+ if (!needDynamicFilteringDependency) {
+ planner.addExtraTransformation(dynamicFilteringTransform);
+ return transformation;
+ } else {
+ MultipleInputTransformation<RowData> multipleInputTransformation =
+ new MultipleInputTransformation<>(
+ "Order-Enforcer",
+ new ExecutionOrderEnforcerOperatorFactory<>(),
+ transformation.getOutputType(),
+ transformation.getParallelism());
+ multipleInputTransformation.addInput(dynamicFilteringTransform);
+ multipleInputTransformation.addInput(transformation);
+
+ return multipleInputTransformation;
+ }
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
index 0576958b8a2..97d5fd86455 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
@@ -44,6 +44,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
@@ -56,7 +57,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.Collections;
+import java.util.List;
import java.util.Optional;
/**
@@ -77,9 +78,10 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData>
ExecNodeContext context,
ReadableConfig persistedConfig,
DynamicTableSourceSpec tableSourceSpec,
+ List<InputProperty> inputProperties,
LogicalType outputType,
String description) {
- super(id, context, persistedConfig, Collections.emptyList(), outputType, description);
+ super(id, context, persistedConfig, inputProperties, outputType, description);
this.tableSourceSpec = tableSourceSpec;
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java
new file mode 100644
index 00000000000..dbbb48ea0e4
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.processor;
+
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMultipleInput;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This processor future checks each dynamic filter source to see if it is chained with a multiple
+ * input operator. If so, we'll set the dependency flag.
+ *
+ * <p>NOTE: This processor can be only applied on {@link BatchExecNode} DAG.
+ */
+public class DynamicFilteringDependencyProcessor implements ExecNodeGraphProcessor {
+
+ @Override
+ public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) {
+ Map<BatchExecTableSourceScan, List<ExecNode<?>>> dynamicFilteringScanDescendants =
+ new HashMap<>();
+
+ AbstractExecNodeExactlyOnceVisitor dynamicFilteringScanCollector =
+ new AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?> node) {
+ node.getInputEdges().stream()
+ .map(ExecEdge::getSource)
+ .forEach(
+ input -> {
+ // The character of the dynamic filter scan is that it
+ // has an input.
+ if (input instanceof BatchExecTableSourceScan
+ && input.getInputEdges().size() > 0) {
+ dynamicFilteringScanDescendants
+ .computeIfAbsent(
+ (BatchExecTableSourceScan) input,
+ ignored -> new ArrayList<>())
+ .add(node);
+ }
+ });
+
+ visitInputs(node);
+ }
+ };
+ execGraph.getRootNodes().forEach(node -> node.accept(dynamicFilteringScanCollector));
+
+ for (Map.Entry<BatchExecTableSourceScan, List<ExecNode<?>>> entry :
+ dynamicFilteringScanDescendants.entrySet()) {
+ if (entry.getValue().size() == 1) {
+ ExecNode<?> next = entry.getValue().get(0);
+ if (next instanceof BatchExecMultipleInput) {
+ // the source can be chained with BatchExecMultipleInput
+ continue;
+ }
+ }
+ // otherwise we need dependencies
+ entry.getKey().setNeedDynamicFilteringDependency(true);
+ }
+
+ return execGraph;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ResetTransformationProcessor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ResetTransformationProcessor.java
new file mode 100644
index 00000000000..4c090786922
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ResetTransformationProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.processor;
+
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+
+/**
+ * A {@link ExecNodeGraphProcessor} that reset the Transformation to null generated by other
+ * processors, such as: {@link MultipleInputNodeCreationProcessor}.
+ */
+public class ResetTransformationProcessor implements ExecNodeGraphProcessor {
+
+ @Override
+ public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) {
+ AbstractExecNodeExactlyOnceVisitor visitor =
+ new AbstractExecNodeExactlyOnceVisitor() {
+
+ @Override
+ protected void visitNode(ExecNode<?> node) {
+ ((ExecNodeBase<?>) node).resetTransformation();
+ visitInputs(node);
+ }
+ };
+ execGraph.getRootNodes().forEach(r -> r.accept(visitor));
+ return execGraph;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
index a6588df84d9..144b527c520 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
@@ -25,11 +25,14 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import static org.apache.flink.table.planner.utils.StreamExchangeModeUtils.getBatchStreamExchangeMode;
@@ -59,7 +62,7 @@ public class InputPriorityConflictResolver extends InputPriorityGraphGenerator {
InputProperty.DamBehavior safeDamBehavior,
StreamExchangeMode exchangeMode,
ReadableConfig tableConfig) {
- super(roots, Collections.emptySet(), safeDamBehavior);
+ super(roots, getDynamicFilteringSourceNodes(roots), safeDamBehavior);
this.exchangeMode = exchangeMode;
this.tableConfig = tableConfig;
}
@@ -152,6 +155,25 @@ public class InputPriorityConflictResolver extends InputPriorityGraphGenerator {
return exchange;
}
+ private static Set<ExecNode<?>> getDynamicFilteringSourceNodes(List<ExecNode<?>> roots) {
+ Set<ExecNode<?>> sourceNodes = new HashSet<>();
+ AbstractExecNodeExactlyOnceVisitor visitor =
+ new AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?> node) {
+ // skip the input of DynamicFiltering TableScanSource
+ if (node instanceof CommonExecTableSourceScan
+ && node.getInputEdges().size() > 0) {
+ sourceNodes.add(node);
+ } else {
+ visitInputs(node);
+ }
+ }
+ };
+ roots.forEach(n -> n.accept(visitor));
+ return sourceNodes;
+ }
+
private static class ConflictCausedByExchangeChecker
extends AbstractExecNodeExactlyOnceVisitor {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
index 9fb292bd960..87df2a677d8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Collections;
+
/**
* Stream {@link ExecNode} to read data from an external source defined by a {@link
* ScanTableSource}.
@@ -71,7 +73,14 @@ public class StreamExecTableSourceScan extends CommonExecTableSourceScan
@JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE) DynamicTableSourceSpec tableSourceSpec,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
- super(id, context, persistedConfig, tableSourceSpec, outputType, description);
+ super(
+ id,
+ context,
+ persistedConfig,
+ tableSourceSpec,
+ Collections.emptyList(),
+ outputType,
+ description);
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 83004ec0f74..00f901d30e5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.operations.{ModifyOperation, Operation}
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode
-import org.apache.flink.table.planner.plan.nodes.exec.processor.{DeadlockBreakupProcessor, ExecNodeGraphProcessor, ForwardHashExchangeProcessor, MultipleInputNodeCreationProcessor}
+import org.apache.flink.table.planner.plan.nodes.exec.processor.{DeadlockBreakupProcessor, DynamicFilteringDependencyProcessor, ExecNodeGraphProcessor, ForwardHashExchangeProcessor, MultipleInputNodeCreationProcessor, ResetTransformationProcessor}
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodePlanDumper
import org.apache.flink.table.planner.plan.optimize.{BatchCommonSubGraphBasedOptimizer, Optimizer}
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
@@ -42,6 +42,7 @@ import org.apache.calcite.sql.SqlExplainLevel
import java.util
import scala.collection.JavaConversions._
+import scala.collection.mutable
class BatchPlanner(
executor: Executor,
@@ -77,6 +78,10 @@ class BatchPlanner(
processors.add(new MultipleInputNodeCreationProcessor(false))
}
processors.add(new ForwardHashExchangeProcessor)
+ if (getTableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
+ processors.add(new DynamicFilteringDependencyProcessor)
+ processors.add(new ResetTransformationProcessor)
+ }
processors
}
@@ -92,13 +97,13 @@ class BatchPlanner(
"This is a bug and should not happen. Please file an issue.")
}
afterTranslation()
- transformations
+ transformations ++ planner.extraTransformations
}
override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
val (sinkRelNodes, optimizedRelNodes, execGraph, streamGraph) = getExplainGraphs(operations)
- val sb = new StringBuilder
+ val sb = new mutable.StringBuilder
sb.append("== Abstract Syntax Tree ==")
sb.append(System.lineSeparator)
sinkRelNodes.foreach {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index e2ef76b26eb..d0ca7bba606 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -104,6 +104,9 @@ abstract class PlannerBase(
private var parser: Parser = _
private var extendedOperationExecutor: ExtendedOperationExecutor = _
private var currentDialect: SqlDialect = getTableConfig.getSqlDialect
+ // the transformations generated in translateToPlan method, they are not connected
+ // with sink transformations but also are needed in the final graph.
+ private[flink] val extraTransformations = new util.ArrayList[Transformation[_]]()
@VisibleForTesting
private[flink] val plannerContext: PlannerContext =
@@ -358,6 +361,12 @@ abstract class PlannerBase(
*/
protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
+ def addExtraTransformation(transformation: Transformation[_]): Unit = {
+ if (!extraTransformations.contains(transformation)) {
+ extraTransformations.add(transformation)
+ }
+ }
+
private def getTableSink(
contextResolvedTable: ContextResolvedTable,
dynamicOptions: JMap[String, String]): Option[(ResolvedCatalogTable, Any)] = {
@@ -480,6 +489,7 @@ abstract class PlannerBase(
// Clean caches that might have filled up during optimization
CompileUtils.cleanUp()
+ extraTransformations.clear()
}
/** Returns all the graphs required to execute EXPLAIN */
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 739d85517b8..2431d9c45e8 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -46,6 +46,8 @@ import org.apache.calcite.sql.SqlExplainLevel
import java.io.{File, IOException}
import java.util
+import scala.collection.mutable
+
class StreamPlanner(
executor: Executor,
tableConfig: TableConfig,
@@ -87,13 +89,13 @@ class StreamPlanner(
"This is a bug and should not happen. Please file an issue.")
}
afterTranslation()
- transformations
+ transformations ++ planner.extraTransformations
}
override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = {
val (sinkRelNodes, optimizedRelNodes, execGraph, streamGraph) = getExplainGraphs(operations)
- val sb = new StringBuilder
+ val sb = new mutable.StringBuilder
sb.append("== Abstract Syntax Tree ==")
sb.append(System.lineSeparator)
sinkRelNodes.foreach {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringDataCollector.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringDataCollector.scala
index 58892934ba3..1c4cf7147d4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringDataCollector.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringDataCollector.scala
@@ -17,7 +17,11 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.batch
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecDynamicFilteringDataCollector
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
+import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
@@ -54,6 +58,11 @@ class BatchPhysicalDynamicFilteringDataCollector(
}
override def translateToExecNode(): ExecNode[_] = {
- throw new UnsupportedOperationException()
+ new BatchExecDynamicFilteringDataCollector(
+ JavaScalaConversionUtil.toJava(dynamicFilteringFieldIndices.map(i => Integer.valueOf(i))),
+ unwrapTableConfig(this),
+ InputProperty.DEFAULT,
+ FlinkTypeFactory.toLogicalRowType(getRowType),
+ getRelDetailedDescription)
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
index ca830616677..90fbc030845 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalDynamicFilteringTableSourceScan.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec
import org.apache.flink.table.planner.plan.schema.TableSourceTable
@@ -88,6 +88,7 @@ class BatchPhysicalDynamicFilteringTableSourceScan(
new BatchExecTableSourceScan(
unwrapTableConfig(this),
tableSourceSpec,
+ InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSource.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSource.java
new file mode 100644
index 00000000000..0622169b8cc
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSource.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.enumerator.DynamicFilteringValuesSourceEnumerator;
+import org.apache.flink.connector.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link Source} implementation that reads data from a partitioned list.
+ *
+ * <p>This source is useful for dynamic filtering testing.
+ */
+public class DynamicFilteringValuesSource
+ implements Source<RowData, ValuesSourcePartitionSplit, NoOpEnumState> {
+
+ private final TypeSerializer<RowData> serializer;
+ private Map<Map<String, String>, byte[]> serializedElements;
+ private Map<Map<String, String>, Integer> counts;
+ private final List<String> dynamicFilteringFields;
+
+ public DynamicFilteringValuesSource(
+ Map<Map<String, String>, Collection<RowData>> elements,
+ TypeSerializer<RowData> serializer,
+ List<String> dynamicFilteringFields) {
+ this.serializer = serializer;
+ this.dynamicFilteringFields = dynamicFilteringFields;
+ serializeElements(serializer, elements);
+ }
+
+ private void serializeElements(
+ TypeSerializer<RowData> serializer,
+ Map<Map<String, String>, Collection<RowData>> elements) {
+ Preconditions.checkState(serializer != null, "serializer not set");
+
+ serializedElements = new HashMap<>();
+ counts = new HashMap<>();
+ for (Map<String, String> partition : elements.keySet()) {
+ Collection<RowData> collection = elements.get(partition);
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos)) {
+ for (RowData e : collection) {
+ serializer.serialize(e, wrapper);
+ }
+ byte[] value = baos.toByteArray();
+ serializedElements.put(partition, value);
+ } catch (Exception e) {
+ throw new TableException(
+ "Serializing the source elements failed: " + e.getMessage(), e);
+ }
+ counts.put(partition, collection.size());
+ }
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader<RowData, ValuesSourcePartitionSplit> createReader(
+ SourceReaderContext readerContext) throws Exception {
+ return new DynamicFilteringValuesSourceReader(
+ serializedElements, counts, serializer, readerContext);
+ }
+
+ @Override
+ public SplitEnumerator<ValuesSourcePartitionSplit, NoOpEnumState> createEnumerator(
+ SplitEnumeratorContext<ValuesSourcePartitionSplit> context) throws Exception {
+ List<ValuesSourcePartitionSplit> splits =
+ serializedElements.keySet().stream()
+ .map(ValuesSourcePartitionSplit::new)
+ .collect(Collectors.toList());
+ return new DynamicFilteringValuesSourceEnumerator(context, splits, dynamicFilteringFields);
+ }
+
+ @Override
+ public SplitEnumerator<ValuesSourcePartitionSplit, NoOpEnumState> restoreEnumerator(
+ SplitEnumeratorContext<ValuesSourcePartitionSplit> context, NoOpEnumState checkpoint) {
+ throw new UnsupportedOperationException("Unsupported now.");
+ }
+
+ @Override
+ public SimpleVersionedSerializer<ValuesSourcePartitionSplit> getSplitSerializer() {
+ return new ValuesSourcePartitionSplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() {
+ return new NoOpEnumStateSerializer();
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSourceReader.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSourceReader.java
new file mode 100644
index 00000000000..71e0dd05ee4
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSourceReader.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link SourceReader} implementation that reads data from a list. */
+public class DynamicFilteringValuesSourceReader
+ implements SourceReader<RowData, ValuesSourcePartitionSplit> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DynamicFilteringValuesSourceReader.class);
+
+ /** The context for this reader, to communicate with the enumerator. */
+ private final SourceReaderContext context;
+
+ /** The availability future. This reader is available as soon as a split is assigned. */
+ private CompletableFuture<Void> availability;
+
+ private final TypeSerializer<RowData> serializer;
+ private final Map<Map<String, String>, byte[]> serializedElements;
+ private final Map<Map<String, String>, Integer> counts;
+ private final Queue<ValuesSourcePartitionSplit> remainingSplits;
+
+ private transient ValuesSourcePartitionSplit currentSplit;
+ private transient Iterator<RowData> iterator;
+ private transient boolean noMoreSplits;
+
+ public DynamicFilteringValuesSourceReader(
+ Map<Map<String, String>, byte[]> serializedElements,
+ Map<Map<String, String>, Integer> counts,
+ TypeSerializer<RowData> serializer,
+ SourceReaderContext context) {
+ this.serializedElements = checkNotNull(serializedElements);
+ this.counts = checkNotNull(counts);
+ this.serializer = serializer;
+ this.context = checkNotNull(context);
+ this.availability = new CompletableFuture<>();
+ this.remainingSplits = new ArrayDeque<>();
+ }
+
+ @Override
+ public void start() {
+ // request a split if we don't have one
+ if (remainingSplits.isEmpty()) {
+ context.sendSplitRequest();
+ }
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<RowData> output) {
+ if (iterator != null) {
+ if (iterator.hasNext()) {
+ output.collect(iterator.next());
+ return InputStatus.MORE_AVAILABLE;
+ } else {
+ finishSplit();
+ }
+ }
+
+ return tryMoveToNextSplit();
+ }
+
+ private void finishSplit() {
+ iterator = null;
+ currentSplit = null;
+
+ // request another split if no other is left
+ // we do this only here in the finishSplit part to avoid requesting a split
+ // whenever the reader is polled and doesn't currently have a split
+ if (remainingSplits.isEmpty() && !noMoreSplits) {
+ context.sendSplitRequest();
+ }
+ }
+
+ private InputStatus tryMoveToNextSplit() {
+ currentSplit = remainingSplits.poll();
+ if (currentSplit != null) {
+ Map<String, String> partition = currentSplit.getPartition();
+ List<RowData> list =
+ deserialize(serializedElements.get(partition), counts.get(partition));
+ iterator = list.iterator();
+ return InputStatus.MORE_AVAILABLE;
+ } else if (noMoreSplits) {
+ return InputStatus.END_OF_INPUT;
+ } else {
+ // ensure we are not called in a loop by resetting the availability future
+ if (availability.isDone()) {
+ availability = new CompletableFuture<>();
+ }
+
+ return InputStatus.NOTHING_AVAILABLE;
+ }
+ }
+
+ private List<RowData> deserialize(byte[] data, int count) {
+ List<RowData> list = new ArrayList<>();
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(data)) {
+ final DataInputView input = new DataInputViewStreamWrapper(bais);
+ for (int i = 0; i < count; ++i) {
+ RowData element = serializer.deserialize(input);
+ list.add(element);
+ }
+ } catch (Exception e) {
+ throw new TableException(
+ "Failed to deserialize an element from the source. "
+ + "If you are using user-defined serialization (Value and Writable types), check the "
+ + "serialization functions.\nSerializer is "
+ + serializer,
+ e);
+ }
+ return list;
+ }
+
+ @Override
+ public List<ValuesSourcePartitionSplit> snapshotState(long checkpointId) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ return availability;
+ }
+
+ @Override
+ public void addSplits(List<ValuesSourcePartitionSplit> splits) {
+ remainingSplits.addAll(splits);
+ // set availability so that pollNext is actually called
+ availability.complete(null);
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ noMoreSplits = true;
+ // set availability so that pollNext is actually called
+ availability.complete(null);
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ LOG.info("checkpoint {} finished.", checkpointId);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/enumerator/DynamicFilteringValuesSourceEnumerator.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/enumerator/DynamicFilteringValuesSourceEnumerator.java
new file mode 100644
index 00000000000..83b09b0ffcc
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/enumerator/DynamicFilteringValuesSourceEnumerator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source.enumerator;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A SplitEnumerator implementation for dynamic filtering source. */
+public class DynamicFilteringValuesSourceEnumerator
+ implements SplitEnumerator<ValuesSourcePartitionSplit, NoOpEnumState> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DynamicFilteringValuesSourceEnumerator.class);
+
+ private final SplitEnumeratorContext<ValuesSourcePartitionSplit> context;
+ private final List<ValuesSourcePartitionSplit> allSplits;
+ private final List<String> dynamicFilteringFields;
+ private transient boolean receivedDynamicFilteringEvent;
+ private transient List<ValuesSourcePartitionSplit> remainingSplits;
+
+ public DynamicFilteringValuesSourceEnumerator(
+ SplitEnumeratorContext<ValuesSourcePartitionSplit> context,
+ List<ValuesSourcePartitionSplit> allSplits,
+ List<String> dynamicFilteringFields) {
+ this.context = context;
+ this.allSplits = allSplits;
+ this.dynamicFilteringFields = dynamicFilteringFields;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+ if (!receivedDynamicFilteringEvent) {
+ throw new IllegalStateException("DynamicFilteringEvent has not receive");
+ }
+ if (remainingSplits.isEmpty()) {
+ context.signalNoMoreSplits(subtaskId);
+ LOG.info("No more splits available for subtask {}", subtaskId);
+ } else {
+ ValuesSourcePartitionSplit split = remainingSplits.remove(0);
+ LOG.debug("Assigned split to subtask {} : {}", subtaskId, split);
+ context.assignSplit(split, subtaskId);
+ }
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof DynamicFilteringEvent) {
+ LOG.warn("Received DynamicFilteringEvent: {}", subtaskId);
+ receivedDynamicFilteringEvent = true;
+ DynamicFilteringData dynamicFilteringData =
+ ((DynamicFilteringEvent) sourceEvent).getData();
+ assignPartitions(dynamicFilteringData);
+ } else {
+ LOG.error("Received unrecognized event: {}", sourceEvent);
+ }
+ }
+
+ private void assignPartitions(DynamicFilteringData data) {
+ if (data.isFiltering()) {
+ remainingSplits = new ArrayList<>();
+ for (ValuesSourcePartitionSplit split : allSplits) {
+ List<String> values =
+ dynamicFilteringFields.stream()
+ .map(k -> split.getPartition().get(k))
+ .collect(Collectors.toList());
+ LOG.info("values: " + values);
+ if (data.contains(generateRowData(values, data.getRowType()))) {
+ remainingSplits.add(split);
+ }
+ }
+ } else {
+ remainingSplits = new ArrayList<>(allSplits);
+ }
+ LOG.info("remainingSplits: " + remainingSplits);
+ }
+
+ private GenericRowData generateRowData(List<String> partitionValues, RowType rowType) {
+ Preconditions.checkArgument(partitionValues.size() == rowType.getFieldCount());
+ Object[] values = new Object[partitionValues.size()];
+ for (int i = 0; i < rowType.getFieldCount(); ++i) {
+ switch (rowType.getTypeAt(i).getTypeRoot()) {
+ case VARCHAR:
+ values[i] = partitionValues.get(i);
+ break;
+ case INTEGER:
+ values[i] = Integer.valueOf(partitionValues.get(i));
+ break;
+ case BIGINT:
+ values[i] = Long.valueOf(partitionValues.get(i));
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ rowType.getTypeAt(i).getTypeRoot() + " is not supported.");
+ }
+ }
+ return GenericRowData.of(values);
+ }
+
+ @Override
+ public void addSplitsBack(List<ValuesSourcePartitionSplit> splits, int subtaskId) {
+ remainingSplits.addAll(splits);
+ }
+
+ @Override
+ public void addReader(int subtaskId) {}
+
+ @Override
+ public NoOpEnumState snapshotState(long checkpointId) throws Exception {
+ return new NoOpEnumState();
+ }
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplit.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplit.java
new file mode 100644
index 00000000000..a8eab5d6241
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplit.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source.split;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.source.DynamicFilteringValuesSource;
+
+import java.util.Map;
+
+/** The split of the {@link DynamicFilteringValuesSource}. */
+public class ValuesSourcePartitionSplit implements SourceSplit {
+
+ private final Map<String, String> partition;
+
+ public ValuesSourcePartitionSplit(Map<String, String> partition) {
+ this.partition = partition;
+ }
+
+ public Map<String, String> getPartition() {
+ return partition;
+ }
+
+ @Override
+ public String splitId() {
+ return partition.toString();
+ }
+
+ @Override
+ public String toString() {
+ return "ValuesSourcePartitionSplit{" + "partition='" + partition + '\'' + '}';
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplitSerializer.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplitSerializer.java
new file mode 100644
index 00000000000..416c5268e88
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/split/ValuesSourcePartitionSplitSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source.split;
+
+import org.apache.flink.connector.source.DynamicFilteringValuesSource;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/** The split serializer for the {@link DynamicFilteringValuesSource}. */
+public class ValuesSourcePartitionSplitSerializer
+ implements SimpleVersionedSerializer<ValuesSourcePartitionSplit> {
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(ValuesSourcePartitionSplit split) throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos)) {
+ Map<String, String> partition = split.getPartition();
+ out.writeUTF(split.splitId());
+ out.writeInt(partition.size());
+ for (Map.Entry<String, String> entry : partition.entrySet()) {
+ out.writeUTF(entry.getKey());
+ out.writeUTF(entry.getValue());
+ }
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public ValuesSourcePartitionSplit deserialize(int version, byte[] serialized)
+ throws IOException {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ DataInputStream in = new DataInputStream(bais)) {
+ Map<String, String> partition = new HashMap<>();
+ String splitId = in.readUTF();
+ int size = in.readInt();
+ for (int i = 0; i < size; ++i) {
+ String key = in.readUTF();
+ String value = in.readUTF();
+ partition.put(key, value);
+ }
+ ValuesSourcePartitionSplit split = new ValuesSourcePartitionSplit(partition);
+ Preconditions.checkArgument(split.splitId().equals(splitId));
+ return split;
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 7960f071e50..b3f545fe718 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.source.DynamicFilteringValuesSource;
import org.apache.flink.connector.source.ValuesSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -743,7 +744,8 @@ public final class TestValuesTableFactory
SupportsLimitPushDown,
SupportsPartitionPushDown,
SupportsReadingMetadata,
- SupportsAggregatePushDown {
+ SupportsAggregatePushDown,
+ SupportsDynamicFiltering {
protected DataType producedDataType;
protected final ChangelogMode changelogMode;
@@ -765,6 +767,7 @@ public final class TestValuesTableFactory
private @Nullable int[] groupingSet;
private List<AggregateExpression> aggregateExpressions;
+ private List<String> acceptedPartitionFilterFields;
private TestValuesScanTableSourceWithoutProjectionPushDown(
DataType producedDataType,
@@ -817,11 +820,11 @@ public final class TestValuesTableFactory
runtimeProviderContext.createDataStructureConverter(producedDataType);
converter.open(
RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));
- Collection<RowData> values = convertToRowData(converter);
switch (runtimeSource) {
case "SourceFunction":
try {
+ Collection<RowData> values = convertToRowData(converter);
final SourceFunction<RowData> sourceFunction;
if (failingSource) {
sourceFunction =
@@ -838,14 +841,16 @@ public final class TestValuesTableFactory
checkArgument(
!failingSource,
"Values InputFormat Source doesn't support as failing source.");
+ Collection<RowData> values = convertToRowData(converter);
return InputFormatProvider.of(new CollectionInputFormat<>(values, serializer));
case "DataStream":
checkArgument(
!failingSource,
"Values DataStream Source doesn't support as failing source.");
try {
+ Collection<RowData> values2 = convertToRowData(converter);
FromElementsFunction<RowData> function =
- new FromElementsFunction<>(serializer, values);
+ new FromElementsFunction<>(serializer, values2);
return new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(
@@ -870,7 +875,18 @@ public final class TestValuesTableFactory
case "NewSource":
checkArgument(
!failingSource, "Values Source doesn't support as failing new source.");
- return SourceProvider.of(new ValuesSource(values, serializer));
+ if (acceptedPartitionFilterFields == null
+ || acceptedPartitionFilterFields.isEmpty()) {
+ Collection<RowData> values2 = convertToRowData(converter);
+ return SourceProvider.of(new ValuesSource(values2, serializer));
+ } else {
+ Map<Map<String, String>, Collection<RowData>> partitionValues =
+ convertToPartitionedRowData(converter);
+ DynamicFilteringValuesSource source =
+ new DynamicFilteringValuesSource(
+ partitionValues, serializer, acceptedPartitionFilterFields);
+ return SourceProvider.of(source);
+ }
default:
throw new IllegalArgumentException(
"Unsupported runtime source class: " + runtimeSource);
@@ -931,8 +947,21 @@ public final class TestValuesTableFactory
protected Collection<RowData> convertToRowData(DataStructureConverter converter) {
List<RowData> result = new ArrayList<>();
+ for (Collection<RowData> rowData : convertToPartitionedRowData(converter).values()) {
+ result.addAll(rowData);
+ }
+ return result;
+ }
+
+ protected Map<Map<String, String>, Collection<RowData>> convertToPartitionedRowData(
+ DataStructureConverter converter) {
+ Map<Map<String, String>, Collection<RowData>> result = new HashMap<>();
+ int size = 0;
int numSkipped = 0;
for (Map<String, String> partition : data.keySet()) {
+ List<RowData> partitionResult = new ArrayList<>();
+ result.put(partition, partitionResult);
+
Collection<Row> rowsInPartition = data.get(partition);
// handle element skipping
@@ -968,11 +997,12 @@ public final class TestValuesTableFactory
final RowData rowData = (RowData) converter.toInternal(row);
if (rowData != null) {
rowData.setRowKind(row.getKind());
- result.add(rowData);
+ partitionResult.add(rowData);
+ size++;
}
// handle limit. No aggregates will be pushed down when there is a limit.
- if (result.size() >= limit) {
+ if (size >= limit) {
return result;
}
}
@@ -1214,12 +1244,31 @@ public final class TestValuesTableFactory
projectedMetadataFields =
remainingMetadataKeys.stream().mapToInt(allMetadataKeys::indexOf).toArray();
}
+
+ @Override
+ public List<String> applyDynamicFiltering(List<String> candidateFilterFields) {
+ if (dynamicFilteringFields != null && dynamicFilteringFields.size() != 0) {
+ checkArgument(!candidateFilterFields.isEmpty());
+ acceptedPartitionFilterFields = new ArrayList<>();
+ for (String field : candidateFilterFields) {
+ if (dynamicFilteringFields.contains(field)) {
+ acceptedPartitionFilterFields.add(field);
+ }
+ }
+
+ return new ArrayList<>(acceptedPartitionFilterFields);
+ } else {
+ throw new UnsupportedOperationException(
+ "Should adding dynamic filtering fields by adding factor"
+ + " in with like: 'dynamic-filtering-fields' = 'a;b'.");
+ }
+ }
}
/** Values {@link ScanTableSource} for testing that supports projection push down. */
private static class TestValuesScanTableSource
extends TestValuesScanTableSourceWithoutProjectionPushDown
- implements SupportsProjectionPushDown, SupportsDynamicFiltering {
+ implements SupportsProjectionPushDown {
private TestValuesScanTableSource(
DataType producedDataType,
@@ -1290,25 +1339,6 @@ public final class TestValuesTableFactory
// we can't immediately project the data here,
// because ReadingMetadataSpec may bring new fields
}
-
- @Override
- public List<String> applyDynamicFiltering(List<String> candidateFilterFields) {
- if (dynamicFilteringFields != null && dynamicFilteringFields.size() != 0) {
- checkArgument(!candidateFilterFields.isEmpty());
- List<String> acceptedPartitionFields = new ArrayList<>();
- for (String field : candidateFilterFields) {
- if (dynamicFilteringFields.contains(field)) {
- acceptedPartitionFields.add(field);
- }
- }
-
- return acceptedPartitionFields;
- } else {
- throw new UnsupportedOperationException(
- "Should adding dynamic filtering fields by adding factor"
- + " in with like: 'dynamic-filtering-fields' = 'a;b'.");
- }
- }
}
/** Values {@link ScanTableSource} for testing that supports watermark push down. */
@@ -1547,25 +1577,6 @@ public final class TestValuesTableFactory
readableMetadata,
projectedMetadataFields);
}
-
- @Override
- public List<String> applyDynamicFiltering(List<String> candidateFilterFields) {
- if (dynamicFilteringFields != null && dynamicFilteringFields.size() != 0) {
- checkArgument(!candidateFilterFields.isEmpty());
- List<String> acceptedPartitionFields = new ArrayList<>();
- for (String field : candidateFilterFields) {
- if (dynamicFilteringFields.contains(field)) {
- acceptedPartitionFields.add(field);
- }
- }
-
- return acceptedPartitionFields;
- } else {
- throw new UnsupportedOperationException(
- "Should adding dynamic filtering fields by adding factor"
- + " in with like: 'dynamic-filtering-fields' = 'a;b'.");
- }
- }
}
/** A mocked {@link LookupTableSource} for validation test. */
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.java
new file mode 100644
index 00000000000..d76f3c3908e
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/** Plan test for dynamic filtering. */
+public class DynamicFilteringTest extends TableTestBase {
+
+ private BatchTableTestUtil util;
+
+ @Before
+ public void before() {
+ util = batchTestUtil(TableConfig.getDefault());
+ util.tableEnv()
+ .getConfig()
+ .getConfiguration()
+ .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 10);
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE fact1 (\n"
+ + " a1 bigint,\n"
+ + " b1 int,\n"
+ + " c1 varchar,\n"
+ + " p1 varchar\n"
+ + ") PARTITIONED BY(p1) WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'runtime-source' = 'NewSource',\n"
+ + " 'partition-list' = 'p1:1;p1:2;p1:3',\n"
+ + " 'dynamic-filtering-fields' = 'p1;b1',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE fact2 (\n"
+ + " a2 bigint,\n"
+ + " b2 int,\n"
+ + " c2 varchar,\n"
+ + " p2 varchar\n"
+ + ") PARTITIONED BY(p2) WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'runtime-source' = 'NewSource',\n"
+ + " 'partition-list' = 'p1:1;p1:2;p1:3',\n"
+ + " 'dynamic-filtering-fields' = 'p2',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE dim (\n"
+ + " x BIGINT,\n"
+ + " y BIGINT,\n"
+ + " z VARCHAR,\n"
+ + " p VARCHAR\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+ }
+
+ @Test
+ public void testLegacySource() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE legacy_source (\n"
+ + " a1 BIGINT,\n"
+ + " b1 BIGINT,\n"
+ + " c1 VARCHAR,\n"
+ + " d1 BIGINT,\n"
+ + " p1 VARCHAR\n"
+ + ") PARTITIONED BY (p1)\n"
+ + " WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'runtime-source' = 'SourceFunction',\n"
+ + " 'partition-list' = 'p1:1;p1:2;p1:3',\n"
+ + " 'dynamic-filtering-fields' = 'p1',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+ util.verifyExplain(
+ "SELECT * FROM legacy_source, dim WHERE p1 = p AND x > 10",
+ JavaScalaConversionUtil.toScala(
+ Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+ }
+
+ @Test
+ public void testSimpleDynamicFiltering() {
+ // the execution plan contains 'Placeholder-Filter' operator
+ util.verifyExplain(
+ "SELECT * FROM fact1, dim WHERE p1 = p AND x > 10",
+ JavaScalaConversionUtil.toScala(
+ Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+ }
+
+ @Test
+ public void testDynamicFilteringWithMultipleInput() {
+ // the execution plan does not contain 'Placeholder-Filter' operator
+ util.verifyExplain(
+ "SELECT * FROM fact1, dim, fact2 WHERE p1 = p and p1 = p2 AND x > 10",
+ JavaScalaConversionUtil.toScala(
+ Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+ }
+
+ @Test
+ public void testDuplicateFactTables() {
+ // the fact tables can not be reused
+ util.verifyExplain(
+ "SELECT * FROM (SELECT * FROM fact1, dim WHERE p1 = p AND x > 10) t1 JOIN fact1 t2 ON t1.y = t2.b1",
+ JavaScalaConversionUtil.toScala(
+ Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+ }
+
+ @Test
+ public void testReuseDimSide() {
+ // dynamic filtering collector will be reused for both fact tables
+ util.verifyExplain(
+ "SELECT * FROM fact1, dim WHERE p1 = p AND x > 10 "
+ + "UNION ALL "
+ + "SELECT * FROM fact2, dim WHERE p2 = p AND x > 10",
+ JavaScalaConversionUtil.toScala(
+ Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+ }
+
+ @Test
+ public void testDynamicFilteringWithStaticPartitionPruning() {
+ util.verifyExplain(
+ "SELECT * FROM fact1, dim WHERE p1 = p AND x > 10 and p1 > 1",
+ JavaScalaConversionUtil.toScala(
+ Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN)));
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
index e79899d496c..cf76f5ba927 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
@@ -24,7 +24,11 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.TestingBatchExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecDynamicFilteringDataCollector;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
+import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.Test;
@@ -147,4 +151,54 @@ public class InputPriorityConflictResolverTest {
checkExchange.accept(input0);
checkExchange.accept(input1);
}
+
+ @Test
+ public void testWithDynamicFilteringPlan() {
+ // no conflicts for dpp pattern
+ // 2 --------------------------------------(P1)--- 1 --(P0)--> 0
+ // \ /
+ // DynamicFilteringDataCollector /
+ // \ /
+ // DynamicFilteringTableSourceScan --(P0) --/
+ TestingBatchExecNode[] nodes = new TestingBatchExecNode[3];
+ for (int i = 0; i < nodes.length; i++) {
+ nodes[i] = new TestingBatchExecNode("TestingBatchExecNode" + i);
+ }
+
+ BatchExecTableSourceScan scan =
+ new BatchExecTableSourceScan(
+ new Configuration(),
+ new DynamicTableSourceSpec(null, null),
+ InputProperty.DEFAULT,
+ RowType.of(new IntType(), new IntType(), new IntType()),
+ "DynamicFilteringTableSourceScan");
+ BatchExecDynamicFilteringDataCollector collector =
+ new BatchExecDynamicFilteringDataCollector(
+ Collections.singletonList(1),
+ new Configuration(),
+ InputProperty.DEFAULT,
+ RowType.of(new IntType()),
+ "DynamicFilteringDataCollector");
+
+ nodes[0].addInput(nodes[1], InputProperty.builder().priority(0).build());
+ nodes[1].addInput(nodes[2], InputProperty.builder().priority(1).build());
+ nodes[1].addInput(scan, InputProperty.builder().priority(0).build());
+ ExecEdge collect2Scan = ExecEdge.builder().source(collector).target(scan).build();
+ scan.setInputEdges(Collections.singletonList(collect2Scan));
+ ExecEdge toCollector = ExecEdge.builder().source(nodes[2]).target(collector).build();
+ collector.setInputEdges(Collections.singletonList(toCollector));
+
+ InputPriorityConflictResolver resolver =
+ new InputPriorityConflictResolver(
+ Collections.singletonList(nodes[1]),
+ InputProperty.DamBehavior.END_INPUT,
+ StreamExchangeMode.BATCH,
+ new Configuration());
+ resolver.detectAndResolve();
+
+ ExecNode<?> input0 = nodes[1].getInputNodes().get(0);
+ ExecNode<?> input1 = nodes[1].getInputNodes().get(1);
+ assertThat(input0).isSameAs(nodes[2]);
+ assertThat(input1).isSameAs(scan);
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DynamicFilteringITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DynamicFilteringITCase.java
new file mode 100644
index 00000000000..acbc4937f58
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DynamicFilteringITCase.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/** IT test for dynamic filtering. */
+public class DynamicFilteringITCase extends BatchTestBase {
+
+ private TableEnvironment tEnv;
+ private Catalog catalog;
+
+ @BeforeEach
+ @Override
+ public void before() throws Exception {
+ super.before();
+ tEnv = tEnv();
+ catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED, true);
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_BLOCKING);
+
+ String dataId1 = TestValuesTableFactory.registerData(TestData.data7());
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE dim (\n"
+ + " x INT,\n"
+ + " y INT,\n"
+ + " z BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'bounded' = 'true'\n"
+ + ")",
+ dataId1));
+
+ String dataId2 = TestValuesTableFactory.registerData(TestData.data5());
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE fact1 (\n"
+ + " `a` INT,\n"
+ + " `b` BIGINT,\n"
+ + " `c` INT,\n"
+ + " `d` VARCHAR,\n"
+ + " `e` BIGINT\n"
+ + ") partitioned by (a)\n"
+ + " WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'runtime-source' = 'NewSource',\n"
+ + " 'partition-list' = 'a:1;a:2;a:3;a:4;a:5;',\n"
+ + " 'dynamic-filtering-fields' = 'a',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'bounded' = 'true'\n"
+ + ")",
+ dataId2));
+
+ String dataId3 = TestValuesTableFactory.registerData(TestData.data5());
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE fact2 (\n"
+ + " `a` INT,\n"
+ + " `b` BIGINT,\n"
+ + " `c` INT,\n"
+ + " `d` VARCHAR,\n"
+ + " `e` BIGINT\n"
+ + ") partitioned by (e, a)\n"
+ + " WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'runtime-source' = 'NewSource',\n"
+ + " 'partition-list' = 'e:1,a:1;e:1,a:2;e:1,a:4;e:1,a:5;e:2,a:2;e:2,a:3;e:2,a:4;e:2,a:5;e:3,a:3;e:3,a:5;',\n"
+ + " 'dynamic-filtering-fields' = 'a;e',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'bounded' = 'true'\n"
+ + ")",
+ dataId3));
+ }
+
+ @Test
+ public void testSimpleDynamicFiltering() {
+ checkResult(
+ "SELECT * FROM fact1, dim WHERE x = a AND z = 2",
+ JavaScalaConversionUtil.toScala(
+ Arrays.asList(
+ Row.of(2, 2, 1, "Hallo Welt", 2, 2, 2, 2),
+ Row.of(2, 3, 2, "Hallo Welt wie", 1, 2, 2, 2),
+ Row.of(3, 4, 3, "Hallo Welt wie gehts?", 2, 3, 3, 2),
+ Row.of(3, 5, 4, "ABC", 2, 3, 3, 2),
+ Row.of(3, 6, 5, "BCD", 3, 3, 3, 2),
+ Row.of(4, 10, 9, "FGH", 2, 4, 5, 2),
+ Row.of(4, 10, 9, "FGH", 2, 4, 7, 2),
+ Row.of(4, 7, 6, "CDE", 2, 4, 5, 2),
+ Row.of(4, 7, 6, "CDE", 2, 4, 7, 2),
+ Row.of(4, 8, 7, "DEF", 1, 4, 5, 2),
+ Row.of(4, 8, 7, "DEF", 1, 4, 7, 2),
+ Row.of(4, 9, 8, "EFG", 1, 4, 5, 2),
+ Row.of(4, 9, 8, "EFG", 1, 4, 7, 2),
+ Row.of(5, 11, 10, "GHI", 1, 5, 9, 2),
+ Row.of(5, 12, 11, "HIJ", 3, 5, 9, 2),
+ Row.of(5, 13, 12, "IJK", 3, 5, 9, 2),
+ Row.of(5, 14, 13, "JKL", 2, 5, 9, 2),
+ Row.of(5, 15, 14, "KLM", 2, 5, 9, 2))),
+ false);
+ }
+
+ @Test
+ public void testDynamicFilteringChainWithMultipleInput() throws Exception {
+ String dataId1 = TestValuesTableFactory.registerData(TestData.data7());
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE dim2 (\n"
+ + " x INT,\n"
+ + " y INT,\n"
+ + " z BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'bounded' = 'true'\n"
+ + ")",
+ dataId1));
+
+ catalog.alterTableStatistics(
+ new ObjectPath(tEnv.getCurrentDatabase(), "dim"),
+ new CatalogTableStatistics(1, 1, 1, 1),
+ false);
+ catalog.alterTableStatistics(
+ new ObjectPath(tEnv.getCurrentDatabase(), "dim2"),
+ new CatalogTableStatistics(100, 1, 1, 1),
+ false);
+ checkResult(
+ "SELECT * FROM fact1, dim, dim2 WHERE dim.x = fact1.a and dim2.y = fact1.a AND dim.z = 1",
+ JavaScalaConversionUtil.toScala(
+ Arrays.asList(
+ Row.of(1, 1, 0, "Hallo", 1, 1, 0, 1, 2, 1, 1),
+ Row.of(2, 2, 1, "Hallo Welt", 2, 2, 1, 1, 2, 2, 2),
+ Row.of(2, 3, 2, "Hallo Welt wie", 1, 2, 1, 1, 2, 2, 2),
+ Row.of(4, 10, 9, "FGH", 2, 4, 6, 1, 3, 4, 3),
+ Row.of(4, 7, 6, "CDE", 2, 4, 6, 1, 3, 4, 3),
+ Row.of(4, 8, 7, "DEF", 1, 4, 6, 1, 3, 4, 3),
+ Row.of(4, 9, 8, "EFG", 1, 4, 6, 1, 3, 4, 3),
+ Row.of(5, 11, 10, "GHI", 1, 5, 8, 1, 4, 5, 2),
+ Row.of(5, 12, 11, "HIJ", 3, 5, 8, 1, 4, 5, 2),
+ Row.of(5, 13, 12, "IJK", 3, 5, 8, 1, 4, 5, 2),
+ Row.of(5, 14, 13, "JKL", 2, 5, 8, 1, 4, 5, 2),
+ Row.of(5, 15, 14, "KLM", 2, 5, 8, 1, 4, 5, 2))),
+ false);
+ }
+
+ @Test
+ public void testDynamicFilteringCannotChainWithMultipleInput() {
+ checkResult(
+ "SELECT * FROM fact1, dim, fact2 WHERE x = fact1.a and fact2.a = fact1.a AND z = 1 and fact1.e = 2 and fact2.e = 1",
+ JavaScalaConversionUtil.toScala(
+ Arrays.asList(
+ Row.of(
+ 2,
+ 2,
+ 1,
+ "Hallo Welt",
+ 2,
+ 2,
+ 1,
+ 1,
+ 2,
+ 3,
+ 2,
+ "Hallo Welt wie",
+ 1),
+ Row.of(4, 10, 9, "FGH", 2, 4, 6, 1, 4, 8, 7, "DEF", 1),
+ Row.of(4, 10, 9, "FGH", 2, 4, 6, 1, 4, 9, 8, "EFG", 1),
+ Row.of(4, 7, 6, "CDE", 2, 4, 6, 1, 4, 8, 7, "DEF", 1),
+ Row.of(4, 7, 6, "CDE", 2, 4, 6, 1, 4, 9, 8, "EFG", 1),
+ Row.of(5, 14, 13, "JKL", 2, 5, 8, 1, 5, 11, 10, "GHI", 1),
+ Row.of(5, 15, 14, "KLM", 2, 5, 8, 1, 5, 11, 10, "GHI", 1))),
+ false);
+ }
+
+ @Test
+ public void testReuseDimSide() {
+ checkResult(
+ "SELECT * FROM fact1, dim WHERE x = a AND z = 1 and b = 3"
+ + "UNION ALL "
+ + "SELECT * FROM fact2, dim WHERE x = a AND z = 1 and b = 2",
+ JavaScalaConversionUtil.toScala(
+ Arrays.asList(
+ Row.of(2, 2, 1, "Hallo Welt", 2, 2, 1, 1),
+ Row.of(2, 3, 2, "Hallo Welt wie", 1, 2, 1, 1))),
+ false);
+ }
+
+ @Test
+ public void testDynamicFilteringWithStaticPartitionPruning() {
+ checkResult(
+ "SELECT * FROM fact2, dim WHERE x = a and e = z AND y < 5 and a = 3",
+ JavaScalaConversionUtil.toScala(
+ Arrays.asList(
+ Row.of(3, 4, 3, "Hallo Welt wie gehts?", 2, 3, 3, 2),
+ Row.of(3, 5, 4, "ABC", 2, 3, 3, 2),
+ Row.of(3, 6, 5, "BCD", 3, 3, 4, 3))),
+ false);
+ }
+
+ @Test
+ public void testMultiplePartitionKeysWithFullKey() {
+ checkResult(
+ "SELECT * FROM fact2, dim WHERE x = a AND z = e and y = 1",
+ JavaScalaConversionUtil.toScala(
+ Collections.singletonList(Row.of(2, 3, 2, "Hallo Welt wie", 1, 2, 1, 1))),
+ false);
+ }
+
+ @Test
+ public void testMultiplePartitionKeysWithPartialKey() {
+ checkResult(
+ "SELECT * FROM fact2, dim WHERE z = e and y = 1",
+ JavaScalaConversionUtil.toScala(
+ Arrays.asList(
+ Row.of(1, 1, 0, "Hallo", 1, 2, 1, 1),
+ Row.of(2, 3, 2, "Hallo Welt wie", 1, 2, 1, 1),
+ Row.of(4, 8, 7, "DEF", 1, 2, 1, 1),
+ Row.of(4, 9, 8, "EFG", 1, 2, 1, 1),
+ Row.of(5, 11, 10, "GHI", 1, 2, 1, 1))),
+ false);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.xml
new file mode 100644
index 00000000000..75d522ca41a
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFilteringTest.xml
@@ -0,0 +1,713 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testDynamicFilteringWithStaticPartitionPruning">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7])
++- LogicalFilter(condition=[AND(=($3, $7), >($4, 10), >(CAST($3):BIGINT, 1))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+
+== Optimized Physical Plan ==
+HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+: +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1, partitions=[{p1=2}, {p1=3}]]], fields=[a1, b1, c1, p1])
+: +- DynamicFilteringDataCollector(fields=[p])
+: +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+: +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p]])
+ +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+ +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+
+== Optimized Execution Plan ==
+HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+: +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1, partitions=[{p1=2}, {p1=3}]]], fields=[a1, b1, c1, p1])
+: +- DynamicFilteringDataCollector(fields=[p])
+: +- Calc(select=[x, y, z, p], where=[(x > 10)])(reuse_id=[1])
+: +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p]])
+ +- Reused(reference_id=[1])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: fact1[]",
+ "pact" : "Data Source",
+ "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1, partitions=[{p1=2}, {p1=3}]]], fields=[a1, b1, c1, p1])",
+ "parallelism" : 10
+ }, {
+ "id" : ,
+ "type" : "Source: dim[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "DynamicFilteringDataCollector[]",
+ "pact" : "Operator",
+ "contents" : "[]:DynamicFilteringDataCollector(fields=[p])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Order-Enforcer",
+ "pact" : "Operator",
+ "contents" : "Order-Enforcer",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "REBALANCE",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "HashJoin[]",
+ "pact" : "Operator",
+ "contents" : "[]:HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "HASH[p]",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "HASH[p1]",
+ "side" : "second"
+ } ]
+ } ]
+}]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDuplicateFactTables">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7], a10=[$8], b10=[$9], c10=[$10], p10=[$11])
++- LogicalJoin(condition=[=($5, $12)], joinType=[inner])
+ :- LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7])
+ : +- LogicalFilter(condition=[AND(=($3, $7), >($4, 10))])
+ : +- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+ +- LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], b10=[CAST($1):BIGINT])
+ +- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+
+== Optimized Physical Plan ==
+Calc(select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10])
++- HashJoin(joinType=[InnerJoin], where=[=(y, b100)], select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10, b100], build=[left])
+ :- Exchange(distribution=[hash[y]])
+ : +- HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+ : :- Exchange(distribution=[hash[p1]])
+ : : +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+ : : +- DynamicFilteringDataCollector(fields=[p])
+ : : +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+ : +- Exchange(distribution=[hash[p]])
+ : +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+ : +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+ +- Exchange(distribution=[hash[b10]])
+ +- Calc(select=[a1, b1, c1, p1, CAST(b1 AS BIGINT) AS b10])
+ +- TableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+
+== Optimized Execution Plan ==
+Calc(select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10])
++- HashJoin(joinType=[InnerJoin], where=[(y = b100)], select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10, b100], build=[left])
+ :- Exchange(distribution=[hash[y]])
+ : +- HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+ : :- Exchange(distribution=[hash[p1]])
+ : : +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+ : : +- DynamicFilteringDataCollector(fields=[p])
+ : : +- Calc(select=[x, y, z, p], where=[(x > 10)])(reuse_id=[1])
+ : : +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+ : +- Exchange(distribution=[hash[p]])
+ : +- Reused(reference_id=[1])
+ +- Exchange(distribution=[hash[b10]])
+ +- Calc(select=[a1, b1, c1, p1, CAST(b1 AS BIGINT) AS b10])
+ +- TableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: fact1[]",
+ "pact" : "Data Source",
+ "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])",
+ "parallelism" : 10
+ }, {
+ "id" : ,
+ "type" : "Source: dim[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "DynamicFilteringDataCollector[]",
+ "pact" : "Operator",
+ "contents" : "[]:DynamicFilteringDataCollector(fields=[p])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Order-Enforcer",
+ "pact" : "Operator",
+ "contents" : "Order-Enforcer",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "REBALANCE",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "HashJoin[]",
+ "pact" : "Operator",
+ "contents" : "[]:HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "HASH[p]",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "HASH[p1]",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Source: fact1[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])",
+ "parallelism" : 10
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[a1, b1, c1, p1, CAST(b1 AS BIGINT) AS b10])",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "HashJoin[]",
+ "pact" : "Operator",
+ "contents" : "[]:HashJoin(joinType=[InnerJoin], where=[(y = b100)], select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10, b100], build=[left])",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "HASH[y]",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "HASH[b10]",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[a1, b1, c1, p1, x, y, z, p, a10, b10, c10, p10])",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDynamicFilteringWithMultipleInput">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7], a2=[$8], b2=[$9], c2=[$10], p2=[$11])
++- LogicalFilter(condition=[AND(=($3, $7), =($3, $11), >($4, 10))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalJoin(condition=[true], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, fact2]])
+
+== Optimized Physical Plan ==
+HashJoin(joinType=[InnerJoin], where=[=(p1, p2)], select=[a1, b1, c1, p1, x, y, z, p, a2, b2, c2, p2], build=[left])
+:- HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+: :- Exchange(distribution=[hash[p1]])
+: : +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+: : +- DynamicFilteringDataCollector(fields=[p])
+: : +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+: : +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+: +- Exchange(distribution=[hash[p]])
+: +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+: +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p2]])
+ +- TableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])
+
+== Optimized Execution Plan ==
+MultipleInput(readOrder=[2,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(p1 = p2)], select=[a1, b1, c1, p1, x, y, z, p, a2, b2, c2, p2], build=[left])\n:- HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])\n: :- [#2] Exchange(distribution=[hash[p1]])\n: +- [#3] Exchange(distribution=[hash[p]])\n+- [#1] Exchange(distribution=[hash[p2]])\n])
+:- Exchange(distribution=[hash[p2]])
+: +- TableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])
+:- Exchange(distribution=[hash[p1]])
+: +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+: +- DynamicFilteringDataCollector(fields=[p])
+: +- Calc(select=[x, y, z, p], where=[(x > 10)])(reuse_id=[1])
+: +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p]])
+ +- Reused(reference_id=[1])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: fact2[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])",
+ "parallelism" : 10
+ }, {
+ "id" : ,
+ "type" : "Source: fact1[]",
+ "pact" : "Data Source",
+ "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])",
+ "parallelism" : 10
+ }, {
+ "id" : ,
+ "type" : "Source: dim[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "DynamicFilteringDataCollector[]",
+ "pact" : "Operator",
+ "contents" : "[]:DynamicFilteringDataCollector(fields=[p])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Order-Enforcer",
+ "pact" : "Operator",
+ "contents" : "Order-Enforcer",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "REBALANCE",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "MultipleInput[]",
+ "pact" : "Operator",
+ "contents" : "[]:MultipleInput(readOrder=[2,1,0], members=[\\nHashJoin(joinType=[InnerJoin], where=[(p1 = p2)], select=[a1, b1, c1, p1, x, y, z, p, a2, b2, c2, p2], build=[left])\\n:- HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])\\n: :- [#2] Exchange(distribution=[hash[p1]])\\n: +- [#3] Exchange(distribution=[hash[p]])\\n+- [#1] Exchange(distribution=[hash[p2]])\\n])",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "HASH[p]",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "HASH[p1]",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "HASH[p2]",
+ "side" : "second"
+ } ]
+ } ]
+}]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLegacySource">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], p1=[$4], x=[$5], y=[$6], z=[$7], p=[$8])
++- LogicalFilter(condition=[AND(=($4, $8), >($5, 10))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, legacy_source]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+
+== Optimized Physical Plan ==
+HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, d1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+: +- TableSourceScan(table=[[default_catalog, default_database, legacy_source]], fields=[a1, b1, c1, d1, p1])
++- Exchange(distribution=[hash[p]])
+ +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+ +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+
+== Optimized Execution Plan ==
+HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, d1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+: +- TableSourceScan(table=[[default_catalog, default_database, legacy_source]], fields=[a1, b1, c1, d1, p1])
++- Exchange(distribution=[hash[p]])
+ +- Calc(select=[x, y, z, p], where=[(x > 10)])
+ +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: legacy_source[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, legacy_source]], fields=[a1, b1, c1, d1, p1])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Source: dim[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "HashJoin[]",
+ "pact" : "Operator",
+ "contents" : "[]:HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, d1, p1, x, y, z, p], build=[right])",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "HASH[p]",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "HASH[p1]",
+ "side" : "second"
+ } ]
+ } ]
+}]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSimpleDynamicFiltering">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7])
++- LogicalFilter(condition=[AND(=($3, $7), >($4, 10))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+
+== Optimized Physical Plan ==
+HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+: +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+: +- DynamicFilteringDataCollector(fields=[p])
+: +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+: +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p]])
+ +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+ +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+
+== Optimized Execution Plan ==
+HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+:- Exchange(distribution=[hash[p1]])
+: +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+: +- DynamicFilteringDataCollector(fields=[p])
+: +- Calc(select=[x, y, z, p], where=[(x > 10)])(reuse_id=[1])
+: +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- Exchange(distribution=[hash[p]])
+ +- Reused(reference_id=[1])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: fact1[]",
+ "pact" : "Data Source",
+ "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])",
+ "parallelism" : 10
+ }, {
+ "id" : ,
+ "type" : "Source: dim[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "DynamicFilteringDataCollector[]",
+ "pact" : "Operator",
+ "contents" : "[]:DynamicFilteringDataCollector(fields=[p])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Order-Enforcer",
+ "pact" : "Operator",
+ "contents" : "Order-Enforcer",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "REBALANCE",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "HashJoin[]",
+ "pact" : "Operator",
+ "contents" : "[]:HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "HASH[p]",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "HASH[p1]",
+ "side" : "second"
+ } ]
+ } ]
+}]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testReuseDimSide">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+:- LogicalProject(a1=[$0], b1=[$1], c1=[$2], p1=[$3], x=[$4], y=[$5], z=[$6], p=[$7])
+: +- LogicalFilter(condition=[AND(=($3, $7), >($4, 10))])
+: +- LogicalJoin(condition=[true], joinType=[inner])
+: :- LogicalTableScan(table=[[default_catalog, default_database, fact1]])
+: +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
++- LogicalProject(a2=[$0], b2=[$1], c2=[$2], p2=[$3], x=[$4], y=[$5], z=[$6], p=[$7])
+ +- LogicalFilter(condition=[AND(=($3, $7), >($4, 10))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, fact2]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+
+== Optimized Physical Plan ==
+Union(all=[true], union=[a1, b1, c1, p1, x, y, z, p])
+:- HashJoin(joinType=[InnerJoin], where=[=(p1, p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])
+: :- Exchange(distribution=[hash[p1]])
+: : +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+: : +- DynamicFilteringDataCollector(fields=[p])
+: : +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+: : +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+: +- Exchange(distribution=[hash[p]])
+: +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+: +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
++- HashJoin(joinType=[InnerJoin], where=[=(p2, p)], select=[a2, b2, c2, p2, x, y, z, p], build=[right])
+ :- Exchange(distribution=[hash[p2]])
+ : +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])
+ : +- DynamicFilteringDataCollector(fields=[p])
+ : +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+ : +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+ +- Exchange(distribution=[hash[p]])
+ +- Calc(select=[x, y, z, p], where=[>(x, 10)])
+ +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+
+== Optimized Execution Plan ==
+MultipleInput(readOrder=[1,0,1,0], members=[\nUnion(all=[true], union=[a1, b1, c1, p1, x, y, z, p])\n:- HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])\n: :- [#1] Exchange(distribution=[hash[p1]])\n: +- [#2] Exchange(distribution=[hash[p]])\n+- HashJoin(joinType=[InnerJoin], where=[(p2 = p)], select=[a2, b2, c2, p2, x, y, z, p], build=[right])\n :- [#3] Exchange(distribution=[hash[p2]])\n +- [#2] Exchange(distribution=[hash[p]])\n])
+:- Exchange(distribution=[hash[p1]])
+: +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])
+: +- DynamicFilteringDataCollector(fields=[p])(reuse_id=[2])
+: +- Calc(select=[x, y, z, p], where=[(x > 10)])(reuse_id=[1])
+: +- TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])
+:- Exchange(distribution=[hash[p]])(reuse_id=[3])
+: +- Reused(reference_id=[1])
+:- Exchange(distribution=[hash[p2]])
+: +- DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])
+: +- Reused(reference_id=[2])
++- Reused(reference_id=[3])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: fact1[]",
+ "pact" : "Data Source",
+ "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact1]], fields=[a1, b1, c1, p1])",
+ "parallelism" : 10
+ }, {
+ "id" : ,
+ "type" : "Source: dim[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[x, y, z, p])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[x, y, z, p], where=[(x > 10)])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "DynamicFilteringDataCollector[]",
+ "pact" : "Operator",
+ "contents" : "[]:DynamicFilteringDataCollector(fields=[p])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Order-Enforcer",
+ "pact" : "Operator",
+ "contents" : "Order-Enforcer",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "REBALANCE",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Source: fact2[]",
+ "pact" : "Data Source",
+ "contents" : "[]:DynamicFilteringTableSourceScan(table=[[default_catalog, default_database, fact2]], fields=[a2, b2, c2, p2])",
+ "parallelism" : 10
+ }, {
+ "id" : ,
+ "type" : "Order-Enforcer",
+ "pact" : "Operator",
+ "contents" : "Order-Enforcer",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "REBALANCE",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "MultipleInput[]",
+ "pact" : "Operator",
+ "contents" : "[]:MultipleInput(readOrder=[1,0,1,0], members=[\\nUnion(all=[true], union=[a1, b1, c1, p1, x, y, z, p])\\n:- HashJoin(joinType=[InnerJoin], where=[(p1 = p)], select=[a1, b1, c1, p1, x, y, z, p], build=[right])\\n: :- [#1] Exchange(distribution=[hash[p1]])\\n: +- [#2] Exchange(distribution=[hash[p]])\\n+- HashJoin(joinType=[InnerJoin], where=[(p2 = p)], select=[a2, b2, c2, p2, x, y, z, p], build=[right])\\n :- [#3] Exchange(distribution=[hash[p2]])\\n +- [#2] Excha [...]
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "HASH[p]",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "HASH[p1]",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "HASH[p]",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "HASH[p2]",
+ "side" : "second"
+ } ]
+ } ]
+}]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
index 5552fb5ba70..d2d13adb880 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
@@ -957,6 +957,114 @@ Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, id
"side" : "second"
} ]
} ]
+}]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMatch[isNameSimplifyEnabled=false]">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(aid=[$0], bid=[$1], cid=[$2])
++- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Optimized Execution Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+ "pact" : "Data Source",
+ "contents" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+ "pact" : "Operator",
+ "contents" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "GLOBAL",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+ "pact" : "Operator",
+ "contents" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMatch[isNameSimplifyEnabled=true]">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(aid=[$0], bid=[$1], cid=[$2])
++- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Optimized Execution Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: MyTable[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "GLOBAL",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Match[]",
+ "pact" : "Operator",
+ "contents" : "[]:Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
}]]>
</Resource>
</TestCase>
@@ -1131,15 +1239,15 @@ NestedLoopJoin(joinType=[InnerJoin], where=[(a = d0)], select=[a, b, c, d, a0, b
{
"nodes" : [ {
"id" : ,
- "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
+ "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
"pact" : "Data Source",
- "contents" : "TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
+ "contents" : "TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
"parallelism" : 1
}, {
"id" : ,
- "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
+ "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
"pact" : "Data Source",
- "contents" : "TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
+ "contents" : "TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
"parallelism" : 1
}, {
"id" : ,
@@ -1185,15 +1293,15 @@ NestedLoopJoin(joinType=[InnerJoin], where=[(a = d0)], select=[a, b, c, d, a0, b
{
"nodes" : [ {
"id" : ,
- "type" : "Source: B[]",
+ "type" : "Source: A[]",
"pact" : "Data Source",
- "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
"parallelism" : 1
}, {
"id" : ,
- "type" : "Source: A[]",
+ "type" : "Source: B[]",
"pact" : "Data Source",
- "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, A]], fields=[a, b, c, d])",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c, d])",
"parallelism" : 1
}, {
"id" : ,
@@ -2093,114 +2201,6 @@ Calc(select=[b, w$end AS window_end, EXPR$2])
"side" : "second"
} ]
} ]
-}]]>
- </Resource>
- </TestCase>
- <TestCase name="testMatch[isNameSimplifyEnabled=false]">
- <Resource name="explain">
- <![CDATA[== Abstract Syntax Tree ==
-LogicalProject(aid=[$0], bid=[$1], cid=[$2])
-+- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-
-== Optimized Physical Plan ==
-Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
-+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
- +- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
-
-== Optimized Execution Plan ==
-Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
-+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
- +- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
-
-== Physical Execution Plan ==
-{
- "nodes" : [ {
- "id" : ,
- "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
- "pact" : "Data Source",
- "contents" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
- "parallelism" : 1
- }, {
- "id" : ,
- "type" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
- "pact" : "Operator",
- "contents" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
- "parallelism" : 1,
- "predecessors" : [ {
- "id" : ,
- "ship_strategy" : "GLOBAL",
- "side" : "second"
- } ]
- }, {
- "id" : ,
- "type" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
- "pact" : "Operator",
- "contents" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
- "parallelism" : 1,
- "predecessors" : [ {
- "id" : ,
- "ship_strategy" : "FORWARD",
- "side" : "second"
- } ]
- } ]
-}]]>
- </Resource>
- </TestCase>
- <TestCase name="testMatch[isNameSimplifyEnabled=true]">
- <Resource name="explain">
- <![CDATA[== Abstract Syntax Tree ==
-LogicalProject(aid=[$0], bid=[$1], cid=[$2])
-+- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-
-== Optimized Physical Plan ==
-Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
-+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
- +- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
-
-== Optimized Execution Plan ==
-Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
-+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
- +- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
-
-== Physical Execution Plan ==
-{
- "nodes" : [ {
- "id" : ,
- "type" : "Source: MyTable[]",
- "pact" : "Data Source",
- "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
- "parallelism" : 1
- }, {
- "id" : ,
- "type" : "Calc[]",
- "pact" : "Operator",
- "contents" : "[]:Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
- "parallelism" : 1,
- "predecessors" : [ {
- "id" : ,
- "ship_strategy" : "GLOBAL",
- "side" : "second"
- } ]
- }, {
- "id" : ,
- "type" : "Match[]",
- "pact" : "Operator",
- "contents" : "[]:Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
- "parallelism" : 1,
- "predecessors" : [ {
- "id" : ,
- "ship_strategy" : "FORWARD",
- "side" : "second"
- } ]
- } ]
}]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
index c95d3c35b27..2ac34b05e05 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala
@@ -521,6 +521,22 @@ object TestData {
val nullablesOfDuplicateData5 = Array(true, true, true, true, true)
+ lazy val data7 = Seq(
+ row(1, 0, 1L),
+ row(2, 1, 1L),
+ row(2, 2, 2L),
+ row(3, 3, 2L),
+ row(3, 4, 3L),
+ row(4, 5, 2L),
+ row(4, 6, 1L),
+ row(4, 7, 2L),
+ row(5, 8, 1L),
+ row(5, 9, 2L),
+ row(5, 10, 3L),
+ row(6, 11, 2L),
+ row(6, 11, 4L)
+ )
+
lazy val numericData: Seq[Row] = Seq(
row(1, 1L, 1.0f, 1.0d, JBigDecimal.valueOf(1)),
row(2, 2L, 2.0f, 2.0d, JBigDecimal.valueOf(2)),