You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/20 02:58:33 UTC
[flink] 01/02: [FLINK-30662][table] Planner supports delete push down.
This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3fa5ecbc8534662fe7e20bf75e23f12890318d02
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Sun Jan 15 17:48:13 2023 +0800
[FLINK-30662][table] Planner supports delete push down.
This closes #21676
---
.../table/api/internal/TableEnvironmentImpl.java | 47 +++
.../operations/DeleteFromFilterOperation.java | 78 +++++
.../table/operations/SinkModifyOperation.java | 51 +++-
.../planner/operations/DeletePushDownUtils.java | 283 ++++++++++++++++++
.../operations/SqlToOperationConverter.java | 44 +++
.../PushFilterInCalcIntoTableSourceScanRule.java | 2 +-
.../logical/PushFilterIntoSourceScanRuleBase.java | 24 --
.../logical/PushFilterIntoTableSourceScanRule.java | 2 +-
.../table/planner/plan/utils/FlinkRexUtil.scala | 29 +-
.../factories/TestUpdateDeleteTableFactory.java | 326 +++++++++++++++++++++
.../operations/DeletePushDownUtilsTest.java | 184 ++++++++++++
.../operations/SqlToOperationConverterTest.java | 52 ++++
.../runtime/batch/sql/DeleteTableITCase.java | 93 ++++++
.../runtime/stream/sql/DeleteTableITCase.java | 38 +++
.../org.apache.flink.table.factories.Factory | 1 +
15 files changed, 1226 insertions(+), 28 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index c54c3d5fa2d..3578e1845cb 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -89,6 +89,7 @@ import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CollectModifyOperation;
import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
import org.apache.flink.table.operations.CreateTableASOperation;
+import org.apache.flink.table.operations.DeleteFromFilterOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
@@ -718,6 +719,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
if (operations.isEmpty()) {
return "";
} else {
+ if (operations.size() > 1
+ && operations.stream().anyMatch(this::isRowLevelModification)) {
+ throw new TableException("Only single UPDATE/DELETE statement is supported.");
+ }
return planner.explain(operations, format, extraDetails);
}
}
@@ -847,6 +852,25 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
executeInternal(ctasOperation.getCreateTableOperation());
mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
} else {
+ boolean isRowLevelModification = isRowLevelModification(modify);
+ if (isRowLevelModification) {
+ String modifyType =
+ ((SinkModifyOperation) modify).isDelete() ? "DELETE" : "UPDATE";
+ if (operations.size() > 1) {
+ throw new TableException(
+ String.format(
+ "Only single %s statement is supported.", modifyType));
+ }
+ if (isStreamingMode) {
+ throw new TableException(
+ String.format(
+ "%s statement is not supported for streaming mode now.",
+ modifyType));
+ }
+ if (modify instanceof DeleteFromFilterOperation) {
+ return executeInternal((DeleteFromFilterOperation) modify);
+ }
+ }
mapOperations.add(modify);
}
}
@@ -865,6 +889,21 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
return result;
}
+ private TableResultInternal executeInternal(
+ DeleteFromFilterOperation deleteFromFilterOperation) {
+ Optional<Long> rows =
+ deleteFromFilterOperation.getSupportsDeletePushDownSink().executeDeletion();
+ if (rows.isPresent()) {
+ return TableResultImpl.builder()
+ .resultKind(ResultKind.SUCCESS)
+ .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())))
+ .data(Arrays.asList(Row.of(String.valueOf(rows.get())), Row.of("OK")))
+ .build();
+ } else {
+ return TableResultImpl.TABLE_RESULT_OK;
+ }
+ }
+
private TableResultInternal executeInternal(
List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
final String defaultJobName = "insert-into_" + String.join(",", sinkIdentifierNames);
@@ -1973,4 +2012,12 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
return planner.explainPlan(compiledPlan, extraDetails);
}
+
+ private boolean isRowLevelModification(Operation operation) {
+ if (operation instanceof SinkModifyOperation) {
+ SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation;
+ return sinkModifyOperation.isDelete() || sinkModifyOperation.isUpdate();
+ }
+ return true;
+ }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DeleteFromFilterOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DeleteFromFilterOperation.java
new file mode 100644
index 00000000000..f902663d732
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DeleteFromFilterOperation.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** The operation for deleting data in a table according to filters directly. */
+@Internal
+public class DeleteFromFilterOperation extends SinkModifyOperation {
+
+ @Nonnull private final SupportsDeletePushDown supportsDeletePushDownSink;
+ @Nonnull private final List<ResolvedExpression> filters;
+
+ public DeleteFromFilterOperation(
+ ContextResolvedTable contextResolvedTable,
+ @Nonnull SupportsDeletePushDown supportsDeletePushDownSink,
+ @Nonnull List<ResolvedExpression> filters) {
+ super(contextResolvedTable, null, ModifyType.DELETE);
+ this.supportsDeletePushDownSink = Preconditions.checkNotNull(supportsDeletePushDownSink);
+ this.filters = Preconditions.checkNotNull(filters);
+ }
+
+ @Nonnull
+ public SupportsDeletePushDown getSupportsDeletePushDownSink() {
+ return supportsDeletePushDownSink;
+ }
+
+ @Nonnull
+ public List<ResolvedExpression> getFilters() {
+ return filters;
+ }
+
+ @Override
+ public String asSummaryString() {
+ Map<String, Object> params = new LinkedHashMap<>();
+ params.put("identifier", getContextResolvedTable().getIdentifier().asSummaryString());
+ params.put("filters", filters);
+ return OperationUtils.formatWithChildren(
+ "DeleteFromFilter", params, Collections.emptyList(), Operation::asSummaryString);
+ }
+
+ @Override
+ public QueryOperation getChild() {
+ throw new UnsupportedOperationException("This shouldn't be called");
+ }
+
+ @Override
+ public <T> T accept(ModifyOperationVisitor<T> visitor) {
+ throw new UnsupportedOperationException("This shouldn't be called");
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
index e9cdb12b3b5..db360c0f5f2 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
@@ -38,27 +38,58 @@ import java.util.Map;
@Internal
public class SinkModifyOperation implements ModifyOperation {
- private final ContextResolvedTable contextResolvedTable;
+ protected final ContextResolvedTable contextResolvedTable;
private final Map<String, String> staticPartitions;
private final QueryOperation child;
private final boolean overwrite;
private final Map<String, String> dynamicOptions;
+ private final ModifyType modifyType;
public SinkModifyOperation(ContextResolvedTable contextResolvedTable, QueryOperation child) {
this(contextResolvedTable, child, Collections.emptyMap(), false, Collections.emptyMap());
}
+ public SinkModifyOperation(
+ ContextResolvedTable contextResolvedTable,
+ QueryOperation child,
+ ModifyType modifyType) {
+ this(
+ contextResolvedTable,
+ child,
+ Collections.emptyMap(),
+ false,
+ Collections.emptyMap(),
+ modifyType);
+ }
+
public SinkModifyOperation(
ContextResolvedTable contextResolvedTable,
QueryOperation child,
Map<String, String> staticPartitions,
boolean overwrite,
Map<String, String> dynamicOptions) {
+ this(
+ contextResolvedTable,
+ child,
+ staticPartitions,
+ overwrite,
+ dynamicOptions,
+ ModifyType.INSERT);
+ }
+
+ public SinkModifyOperation(
+ ContextResolvedTable contextResolvedTable,
+ QueryOperation child,
+ Map<String, String> staticPartitions,
+ boolean overwrite,
+ Map<String, String> dynamicOptions,
+ ModifyType modifyType) {
this.contextResolvedTable = contextResolvedTable;
this.child = child;
this.staticPartitions = staticPartitions;
this.overwrite = overwrite;
this.dynamicOptions = dynamicOptions;
+ this.modifyType = modifyType;
}
public ContextResolvedTable getContextResolvedTable() {
@@ -73,6 +104,14 @@ public class SinkModifyOperation implements ModifyOperation {
return overwrite;
}
+ public boolean isUpdate() {
+ return modifyType == ModifyType.UPDATE;
+ }
+
+ public boolean isDelete() {
+ return modifyType == ModifyType.DELETE;
+ }
+
public Map<String, String> getDynamicOptions() {
return dynamicOptions;
}
@@ -91,6 +130,7 @@ public class SinkModifyOperation implements ModifyOperation {
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("identifier", getContextResolvedTable().getIdentifier().asSummaryString());
+ params.put("modifyType", modifyType);
params.put("staticPartitions", staticPartitions);
params.put("overwrite", overwrite);
params.put("dynamicOptions", dynamicOptions);
@@ -101,4 +141,13 @@ public class SinkModifyOperation implements ModifyOperation {
Collections.singletonList(child),
Operation::asSummaryString);
}
+
+ /** The type of sink modification. */
+ public enum ModifyType {
+ INSERT,
+
+ UPDATE,
+
+ DELETE
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
new file mode 100644
index 00000000000..c9682ed5078
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TableFactoryUtil;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+
+/** A utility class for delete push down. */
+public class DeletePushDownUtils {
+
+ /**
+ * Get the {@link DynamicTableSink} for the table to be modified. Return Optional.empty() if it
+ * can't get the {@link DynamicTableSink}.
+ */
+ public static Optional<DynamicTableSink> getDynamicTableSink(
+ ContextResolvedTable contextResolvedTable,
+ LogicalTableModify tableModify,
+ CatalogManager catalogManager) {
+ final FlinkContext context = ShortcutUtils.unwrapContext(tableModify.getCluster());
+
+ CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable();
+ // only consider DynamicTableSink
+ if (catalogBaseTable instanceof CatalogTable) {
+ ResolvedCatalogTable resolvedTable = contextResolvedTable.getResolvedTable();
+ Optional<Catalog> optionalCatalog = contextResolvedTable.getCatalog();
+ ObjectIdentifier objectIdentifier = contextResolvedTable.getIdentifier();
+ boolean isTemporary = contextResolvedTable.isTemporary();
+ // only consider the CatalogTable that doesn't use legacy connector sink option
+ if (!contextResolvedTable.isAnonymous()
+ && !TableFactoryUtil.isLegacyConnectorOptions(
+ catalogManager
+ .getCatalog(objectIdentifier.getCatalogName())
+ .orElse(null),
+ context.getTableConfig(),
+ !context.isBatchMode(),
+ objectIdentifier,
+ resolvedTable,
+ isTemporary)) {
+ DynamicTableSinkFactory dynamicTableSinkFactory = null;
+ if (optionalCatalog.isPresent()
+ && optionalCatalog.get().getFactory().isPresent()
+ && optionalCatalog.get().getFactory().get()
+ instanceof DynamicTableSinkFactory) {
+ // try get from catalog
+ dynamicTableSinkFactory =
+ (DynamicTableSinkFactory) optionalCatalog.get().getFactory().get();
+ }
+
+ if (dynamicTableSinkFactory == null) {
+ Optional<DynamicTableSinkFactory> factoryFromModule =
+ context.getModuleManager().getFactory((Module::getTableSinkFactory));
+ // then try get from module
+ dynamicTableSinkFactory = factoryFromModule.orElse(null);
+ }
+ // create table dynamic table sink
+ DynamicTableSink tableSink =
+ FactoryUtil.createDynamicTableSink(
+ dynamicTableSinkFactory,
+ objectIdentifier,
+ resolvedTable,
+ Collections.emptyMap(),
+ context.getTableConfig(),
+ context.getClassLoader(),
+ contextResolvedTable.isTemporary());
+ return Optional.of(tableSink);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Get the resolved filter expressions from the {@code WHERE} clause in DELETE statement, return
+ * Optional.empty() if {@code WHERE} clause contains sub-query.
+ */
+ public static Optional<List<ResolvedExpression>> getResolvedFilterExpressions(
+ LogicalTableModify tableModify) {
+ FlinkContext context = ShortcutUtils.unwrapContext(tableModify.getCluster());
+ RelNode input = tableModify.getInput().getInput(0);
+ // no WHERE clause, return an empty list
+ if (input instanceof LogicalTableScan) {
+ return Optional.of(Collections.emptyList());
+ }
+ if (!(input instanceof LogicalFilter)) {
+ return Optional.empty();
+ }
+
+ Filter filter = (Filter) input;
+ if (RexUtil.SubQueryFinder.containsSubQuery(filter)) {
+ return Optional.empty();
+ }
+
+ // optimize the filter
+ filter = prepareFilter(filter);
+
+ // resolve the filter to get resolved expression
+ List<ResolvedExpression> resolveExpression = resolveFilter(context, filter);
+ return Optional.ofNullable(resolveExpression);
+ }
+
+ /** Prepare the filter with reducing && simplifying. */
+ private static Filter prepareFilter(Filter filter) {
+ // we try to reduce and simplify the filter
+ ReduceExpressionsRuleProxy reduceExpressionsRuleProxy = ReduceExpressionsRuleProxy.INSTANCE;
+ SimplifyFilterConditionRule simplifyFilterConditionRule =
+ SimplifyFilterConditionRule.INSTANCE();
+ // max iteration num for reducing and simplifying filter,
+ // we use 5 as the max iteration num which is same with the iteration num in Flink's plan
+ // optimizing.
+ int maxIteration = 5;
+
+ boolean changed = true;
+ int iteration = 1;
+ // iterate until it reaches max iteration num or there's no changes in one iterate
+ while (changed && iteration <= maxIteration) {
+ changed = false;
+ // first apply the rule to reduce condition in filter
+ RexNode newCondition = filter.getCondition();
+ List<RexNode> expList = new ArrayList<>();
+ expList.add(newCondition);
+ if (reduceExpressionsRuleProxy.reduce(filter, expList)) {
+ // get the new condition
+ newCondition = expList.get(0);
+ changed = true;
+ }
+ // create a new filter
+ filter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition);
+ // then apply the rule to simplify filter
+ Option<Filter> changedFilter =
+ simplifyFilterConditionRule.simplify(filter, new boolean[] {false});
+ if (changedFilter.isDefined()) {
+ filter = changedFilter.get();
+ changed = true;
+ }
+ iteration += 1;
+ }
+ return filter;
+ }
+
+ /**
+ * A proxy for {@link ReduceExpressionsRule}, which enables us to call the method {@link
+ * ReduceExpressionsRule#reduceExpressions(RelNode, List, RelOptPredicateList)}.
+ */
+ private static class ReduceExpressionsRuleProxy
+ extends ReduceExpressionsRule<ReduceExpressionsRule.Config> {
+ private static final ReduceExpressionsRule.Config config =
+ FilterReduceExpressionsRule.FilterReduceExpressionsRuleConfig.DEFAULT;
+ private static final ReduceExpressionsRuleProxy INSTANCE = new ReduceExpressionsRuleProxy();
+
+ public ReduceExpressionsRuleProxy() {
+ super(config);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall relOptRuleCall) {
+ throw new UnsupportedOperationException("This shouldn't be called");
+ }
+
+ private boolean reduce(RelNode rel, List<RexNode> expList) {
+ return reduceExpressions(
+ rel,
+ expList,
+ RelOptPredicateList.EMPTY,
+ true,
+ config.matchNullability(),
+ config.treatDynamicCallsAsConstant());
+ }
+ }
+
+ /** Return the ResolvedExpression according to Filter. */
+ private static List<ResolvedExpression> resolveFilter(FlinkContext context, Filter filter) {
+ Tuple2<RexNode[], RexNode[]> extractedPredicates =
+ FlinkRexUtil.extractPredicates(
+ filter.getInput().getRowType().getFieldNames().toArray(new String[0]),
+ filter.getCondition(),
+ filter,
+ filter.getCluster().getRexBuilder());
+ RexNode[] convertiblePredicates = extractedPredicates._1;
+ RexNode[] unconvertedPredicates = extractedPredicates._2;
+ if (unconvertedPredicates.length != 0) {
+ // if contain any unconverted condition, return null
+ return null;
+ }
+ RexNodeToExpressionConverter converter =
+ new RexNodeToExpressionConverter(
+ filter.getCluster().getRexBuilder(),
+ filter.getInput().getRowType().getFieldNames().toArray(new String[0]),
+ context.getFunctionCatalog(),
+ context.getCatalogManager(),
+ TimeZone.getTimeZone(
+ TableConfigUtils.getLocalTimeZone(context.getTableConfig())));
+ List<Expression> filters =
+ Arrays.stream(convertiblePredicates)
+ .map(
+ p -> {
+ Option<ResolvedExpression> expr = p.accept(converter);
+ if (expr.isDefined()) {
+ return expr.get();
+ } else {
+ throw new TableException(
+ String.format(
+ "%s can not be converted to Expression",
+ p));
+ }
+ })
+ .collect(Collectors.toList());
+ ExpressionResolver resolver =
+ ExpressionResolver.resolverFor(
+ context.getTableConfig(),
+ context.getClassLoader(),
+ name -> Optional.empty(),
+ context.getFunctionCatalog()
+ .asLookup(
+ str -> {
+ throw new TableException(
+ "We should not need to lookup any expressions at this point");
+ }),
+ context.getCatalogManager().getDataTypeFactory(),
+ (sqlExpression, inputRowType, outputType) -> {
+ throw new TableException(
+ "SQL expression parsing is not supported at this location.");
+ })
+ .build();
+ return resolver.resolve(filters);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 96d279517fd..1851cce8c11 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -118,15 +118,19 @@ import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
+import org.apache.flink.table.operations.DeleteFromFilterOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.EndStatementSetOperation;
import org.apache.flink.table.operations.ExplainOperation;
@@ -199,6 +203,8 @@ import org.apache.flink.util.StringUtils;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.sql.SqlDelete;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
@@ -392,6 +398,8 @@ public class SqlToOperationConverter {
return Optional.of(converter.convertAnalyzeTable((SqlAnalyzeTable) validated));
} else if (validated instanceof SqlStopJob) {
return Optional.of(converter.convertStopJob((SqlStopJob) validated));
+ } else if (validated instanceof SqlDelete) {
+ return Optional.of(converter.convertDelete((SqlDelete) validated));
} else {
return Optional.empty();
}
@@ -1492,6 +1500,42 @@ public class SqlToOperationConverter {
sqlStopJob.getId(), sqlStopJob.isWithSavepoint(), sqlStopJob.isWithDrain());
}
+ private Operation convertDelete(SqlDelete sqlDelete) {
+ RelRoot updateRelational = flinkPlanner.rel(sqlDelete);
+ LogicalTableModify tableModify = (LogicalTableModify) updateRelational.rel;
+ UnresolvedIdentifier unresolvedTableIdentifier =
+ UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName());
+ ContextResolvedTable contextResolvedTable =
+ catalogManager.getTableOrError(
+ catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
+ // try push down delete
+ Optional<DynamicTableSink> optionalDynamicTableSink =
+ DeletePushDownUtils.getDynamicTableSink(
+ contextResolvedTable, tableModify, catalogManager);
+ if (optionalDynamicTableSink.isPresent()) {
+ DynamicTableSink dynamicTableSink = optionalDynamicTableSink.get();
+ // if the table sink supports delete push down
+ if (dynamicTableSink instanceof SupportsDeletePushDown) {
+ SupportsDeletePushDown supportsDeletePushDownSink =
+ (SupportsDeletePushDown) dynamicTableSink;
+ // get resolved filter expression
+ Optional<List<ResolvedExpression>> filters =
+ DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+ if (filters.isPresent()
+ && supportsDeletePushDownSink.applyDeleteFilters(filters.get())) {
+ return new DeleteFromFilterOperation(
+ contextResolvedTable, supportsDeletePushDownSink, filters.get());
+ }
+ }
+ }
+
+ // otherwise, delete push down is not applicable, throw unsupported exception
+ throw new UnsupportedOperationException(
+ String.format(
+ "Only delete push down is supported currently, but the delete statement can't pushed to table sink %s.",
+ unresolvedTableIdentifier.asSummaryString()));
+ }
+
private void validateTableConstraint(SqlTableConstraint constraint) {
if (constraint.isUnique()) {
throw new UnsupportedOperationException("UNIQUE constraint is not supported yet");
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
index 33d24fa460f..4dc970af831 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
@@ -85,7 +85,7 @@ public class PushFilterInCalcIntoTableSourceScanRule extends PushFilterIntoSourc
RelBuilder relBuilder = call.builder();
Tuple2<RexNode[], RexNode[]> extractedPredicates =
- extractPredicates(
+ FlinkRexUtil.extractPredicates(
originProgram.getInputRowType().getFieldNames().toArray(new String[0]),
originProgram.expandLocalRef(originProgram.getCondition()),
scan,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index 3e023d50afc..eb5e4f6d25d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -22,29 +22,22 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
-import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
-import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
-import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;
import java.util.Arrays;
import java.util.List;
-import java.util.TimeZone;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -120,23 +113,6 @@ public abstract class PushFilterIntoSourceScanRuleBase extends RelOptRule {
return new Tuple2<>(result, newTableSourceTable);
}
- protected Tuple2<RexNode[], RexNode[]> extractPredicates(
- String[] inputNames, RexNode filterExpression, TableScan scan, RexBuilder rexBuilder) {
- FlinkContext context = ShortcutUtils.unwrapContext(scan);
- int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
- RexNodeToExpressionConverter converter =
- new RexNodeToExpressionConverter(
- rexBuilder,
- inputNames,
- context.getFunctionCatalog(),
- context.getCatalogManager(),
- TimeZone.getTimeZone(
- TableConfigUtils.getLocalTimeZone(context.getTableConfig())));
-
- return RexNodeExtractor.extractConjunctiveConditions(
- filterExpression, maxCnfNodeCount, rexBuilder, converter);
- }
-
/**
* Determines wether we can pushdown the filter into the source. we can not push filter twice,
* make sure FilterPushDownSpec has not been assigned as a capability.
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index 516f00c15cd..eaa6999cdd7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -78,7 +78,7 @@ public class PushFilterIntoTableSourceScanRule extends PushFilterIntoSourceScanR
RelBuilder relBuilder = call.builder();
Tuple2<RexNode[], RexNode[]> extractedPredicates =
- extractPredicates(
+ FlinkRexUtil.extractPredicates(
filter.getInput().getRowType().getFieldNames().toArray(new String[0]),
filter.getCondition(),
scan,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
index 81819208b9d..dba27b0eb2e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
@@ -23,12 +23,14 @@ import org.apache.flink.configuration.ConfigOptions.key
import org.apache.flink.table.planner.functions.sql.SqlTryCastFunction
import org.apache.flink.table.planner.plan.utils.ExpressionDetail.ExpressionDetail
import org.apache.flink.table.planner.plan.utils.ExpressionFormat.ExpressionFormat
+import org.apache.flink.table.planner.utils.{ShortcutUtils, TableConfigUtils}
import com.google.common.base.Function
import com.google.common.collect.{ImmutableList, Lists}
import org.apache.calcite.avatica.util.ByteString
import org.apache.calcite.plan.{RelOptPredicateList, RelOptUtil}
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.RelNode
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.{SqlAsOperator, SqlKind, SqlOperator}
@@ -39,7 +41,7 @@ import org.apache.calcite.util._
import java.lang.{Iterable => JIterable}
import java.math.BigDecimal
import java.util
-import java.util.Optional
+import java.util.{Optional, TimeZone}
import java.util.function.Predicate
import scala.collection.JavaConversions._
@@ -634,6 +636,31 @@ object FlinkRexUtil {
val projects = rexProgram.getProjectList.map(rexProgram.expandLocalRef)
projects.forall(isDeterministicInStreaming)
}
+
+ /**
+ * Return convertible rex nodes and unconverted rex nodes extracted from the filter expression.
+ */
+ def extractPredicates(
+ inputNames: Array[String],
+ filterExpression: RexNode,
+ rel: RelNode,
+ rexBuilder: RexBuilder): (Array[RexNode], Array[RexNode]) = {
+ val context = ShortcutUtils.unwrapContext(rel)
+ val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(rel);
+ val converter =
+ new RexNodeToExpressionConverter(
+ rexBuilder,
+ inputNames,
+ context.getFunctionCatalog,
+ context.getCatalogManager,
+ TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(context.getTableConfig)));
+
+ RexNodeExtractor.extractConjunctiveConditions(
+ filterExpression,
+ maxCnfNodeCount,
+ rexBuilder,
+ converter);
+ }
}
/**
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
new file mode 100644
index 00000000000..df114031ecf
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/** A factory to create table to support update/delete for test purpose. */
+public class TestUpdateDeleteTableFactory
+ implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "test-update-delete";
+
+ private static final ConfigOption<String> DATA_ID =
+ ConfigOptions.key("data-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The data id used to read the rows.");
+
+ private static final ConfigOption<Boolean> ONLY_ACCEPT_EQUAL_PREDICATE =
+ ConfigOptions.key("only-accept-equal-predicate")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether only accept when the all predicates in filter is equal expression for delete statement.");
+
+ private static final ConfigOption<Boolean> SUPPORT_DELETE_PUSH_DOWN =
+ ConfigOptions.key("support-delete-push-down")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether the table supports delete push down.");
+
+ private static final AtomicInteger idCounter = new AtomicInteger(0);
+ private static final Map<String, Collection<RowData>> registeredRowData = new HashMap<>();
+
+ public static String registerRowData(Collection<RowData> data) {
+ String id = String.valueOf(idCounter.incrementAndGet());
+ registeredRowData.put(id, data);
+ return id;
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validate();
+ String dataId =
+ helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get()));
+ if (helper.getOptions().get(SUPPORT_DELETE_PUSH_DOWN)) {
+ return new SupportsDeletePushDownSink(
+ dataId,
+ helper.getOptions().get(ONLY_ACCEPT_EQUAL_PREDICATE),
+ context.getCatalogTable());
+ } else {
+ return new TestSink();
+ }
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ helper.validate();
+
+ String dataId =
+ helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get()));
+ return new TestTableSource(dataId);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return new HashSet<>(
+ Arrays.asList(DATA_ID, ONLY_ACCEPT_EQUAL_PREDICATE, SUPPORT_DELETE_PUSH_DOWN));
+ }
+
+ /** A test table source which supports reading metadata. */
+ private static class TestTableSource implements ScanTableSource {
+ private final String dataId;
+
+ public TestTableSource(String dataId) {
+ this.dataId = dataId;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new TestTableSource(dataId);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "test table source";
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+ return new SourceFunctionProvider() {
+ @Override
+ public SourceFunction<RowData> createSourceFunction() {
+ Collection<RowData> rows = registeredRowData.get(dataId);
+ if (rows != null) {
+ return new FromElementsFunction<>(rows);
+ } else {
+ return new FromElementsFunction<>();
+ }
+ }
+
+ @Override
+ public boolean isBounded() {
+ return true;
+ }
+ };
+ }
+ }
+
+ /** A common test sink. */
+ private static class TestSink implements DynamicTableSink {
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ return null;
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return null;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return null;
+ }
+ }
+
+ /** A sink that supports delete push down. */
+ public static class SupportsDeletePushDownSink extends TestSink
+ implements SupportsDeletePushDown {
+
+ private final String dataId;
+ private final boolean onlyAcceptEqualPredicate;
+ private final ResolvedCatalogTable resolvedCatalogTable;
+ private final RowData.FieldGetter[] fieldGetters;
+ private final List<String> columns;
+
+ private List<Tuple2<String, Object>> equalPredicates;
+
+ public SupportsDeletePushDownSink(
+ String dataId,
+ boolean onlyAcceptEqualPredicate,
+ ResolvedCatalogTable resolvedCatalogTable) {
+ this.dataId = dataId;
+ this.onlyAcceptEqualPredicate = onlyAcceptEqualPredicate;
+ this.resolvedCatalogTable = resolvedCatalogTable;
+ this.fieldGetters = getAllFieldGetter(resolvedCatalogTable.getResolvedSchema());
+ this.columns = resolvedCatalogTable.getResolvedSchema().getColumnNames();
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new SupportsDeletePushDownSink(
+ dataId, onlyAcceptEqualPredicate, resolvedCatalogTable);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "SupportDeletePushDownSink";
+ }
+
+ @Override
+ public boolean applyDeleteFilters(List<ResolvedExpression> filters) {
+ if (onlyAcceptEqualPredicate) {
+ Optional<List<Tuple2<String, Object>>> optionalEqualPredicates =
+ getEqualPredicates(filters);
+ if (optionalEqualPredicates.isPresent()) {
+ equalPredicates = optionalEqualPredicates.get();
+ return true;
+ }
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Optional<Long> executeDeletion() {
+ if (onlyAcceptEqualPredicate) {
+ Collection<RowData> existingRows = registeredRowData.get(dataId);
+ long rowsBefore = existingRows.size();
+ existingRows.removeIf(
+ rowData ->
+ satisfyEqualPredicate(
+ equalPredicates, rowData, fieldGetters, columns));
+ return Optional.of(rowsBefore - existingRows.size());
+ }
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Get a list of equal predicate from a list of filter, each contains [column, value]. Return
+ * Optional.empty() if it contains any non-equal predicate.
+ */
+ private static Optional<List<Tuple2<String, Object>>> getEqualPredicates(
+ List<ResolvedExpression> filters) {
+ List<Tuple2<String, Object>> equalPredicates = new ArrayList<>();
+ for (ResolvedExpression expression : filters) {
+ if (!(expression instanceof CallExpression)) {
+ return Optional.empty();
+ }
+ CallExpression callExpression = (CallExpression) expression;
+ if (callExpression.getFunctionDefinition() != BuiltInFunctionDefinitions.EQUALS) {
+ return Optional.empty();
+ }
+ String colName = getColumnName(callExpression);
+ Object value = getColumnValue(callExpression);
+ equalPredicates.add(Tuple2.of(colName, value));
+ }
+ return Optional.of(equalPredicates);
+ }
+
+ private static String getColumnName(CallExpression comp) {
+ return ((FieldReferenceExpression) comp.getChildren().get(0)).getName();
+ }
+
+ private static Object getColumnValue(CallExpression comp) {
+ ValueLiteralExpression valueLiteralExpression =
+ (ValueLiteralExpression) comp.getChildren().get(1);
+ return valueLiteralExpression
+ .getValueAs(valueLiteralExpression.getOutputDataType().getConversionClass())
+ .get();
+ }
+
+ /** Check the rowData satisfies the equal predicate. */
+ private static boolean satisfyEqualPredicate(
+ List<Tuple2<String, Object>> equalPredicates,
+ RowData rowData,
+ RowData.FieldGetter[] fieldGetters,
+ List<String> columns) {
+ for (Tuple2<String, Object> equalPredicate : equalPredicates) {
+ String colName = equalPredicate.f0;
+ Object value = equalPredicate.f1;
+ int colIndex = columns.indexOf(colName);
+ if (!(Objects.equals(value, fieldGetters[colIndex].getFieldOrNull(rowData)))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static RowData.FieldGetter[] getAllFieldGetter(ResolvedSchema resolvedSchema) {
+ List<DataType> dataTypes = resolvedSchema.getColumnDataTypes();
+ RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[dataTypes.size()];
+ for (int i = 0; i < dataTypes.size(); i++) {
+ fieldGetters[i] = createFieldGetter(dataTypes.get(i).getLogicalType(), i);
+ }
+ return fieldGetters;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
new file mode 100644
index 00000000000..f134223d243
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.planner.delegation.PlannerContext;
+import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
+import org.apache.flink.table.planner.parse.CalciteParser;
+import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.sql.SqlNode;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DeletePushDownUtils}. */
+public class DeletePushDownUtilsTest {
+ private final TableConfig tableConfig = TableConfig.getDefault();
+ private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
+ private final CatalogManager catalogManager =
+ CatalogManagerMocks.preparedCatalogManager()
+ .defaultCatalog("builtin", catalog)
+ .config(
+ Configuration.fromMap(
+ Collections.singletonMap(
+ ExecutionOptions.RUNTIME_MODE.key(),
+ RuntimeExecutionMode.BATCH.name())))
+ .build();
+
+ private final PlannerMocks plannerMocks =
+ PlannerMocks.newBuilder()
+ .withBatchMode(true)
+ .withTableConfig(tableConfig)
+ .withCatalogManager(catalogManager)
+ .withRootSchema(
+ asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)))
+ .build();
+ private final PlannerContext plannerContext = plannerMocks.getPlannerContext();
+ private final CalciteParser parser = plannerContext.createCalciteParser();
+ private final FlinkPlannerImpl flinkPlanner = plannerContext.createFlinkPlanner();
+
+ @Test
+ public void testGetDynamicTableSink() {
+ // create a table with connector = test-update-delete
+ Map<String, String> options = new HashMap<>();
+ options.put("connector", "test-update-delete");
+ CatalogTable catalogTable = createTestCatalogTable(options);
+ ObjectIdentifier tableId = ObjectIdentifier.of("builtin", "default", "t");
+ catalogManager.createTable(catalogTable, tableId, false);
+ ContextResolvedTable resolvedTable =
+ ContextResolvedTable.permanent(
+ tableId, catalog, catalogManager.resolveCatalogTable(catalogTable));
+ LogicalTableModify tableModify = getTableModifyFromSql("DELETE FROM t");
+ Optional<DynamicTableSink> optionalDynamicTableSink =
+ DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager);
+ // verify we can get the dynamic table sink
+ assertThat(optionalDynamicTableSink).isPresent();
+ assertThat(optionalDynamicTableSink.get())
+ .isInstanceOf(TestUpdateDeleteTableFactory.SupportsDeletePushDownSink.class);
+
+ // create table with connector = COLLECTION, it's legacy table sink
+ options.put("connector", "COLLECTION");
+ catalogTable = createTestCatalogTable(options);
+ tableId = ObjectIdentifier.of("builtin", "default", "t1");
+ catalogManager.createTable(catalogTable, tableId, false);
+ resolvedTable =
+ ContextResolvedTable.permanent(
+ tableId, catalog, catalogManager.resolveCatalogTable(catalogTable));
+ tableModify = getTableModifyFromSql("DELETE FROM t1");
+ optionalDynamicTableSink =
+ DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager);
+ // verify it should be empty since it's not an instance of DynamicTableSink but is legacy
+ // TableSink
+ assertThat(optionalDynamicTableSink).isEmpty();
+ }
+
+ @Test
+ public void testGetResolveFilterExpressions() {
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT().notNull())
+ .column("f1", DataTypes.STRING().nullable())
+ .column("f2", DataTypes.BIGINT().nullable())
+ .build(),
+ null,
+ Collections.emptyList(),
+ Collections.emptyMap());
+ catalogManager.createTable(
+ catalogTable, ObjectIdentifier.of("builtin", "default", "t"), false);
+
+ // verify there's no where clause
+ LogicalTableModify tableModify = getTableModifyFromSql("DELETE FROM t");
+ Optional<List<ResolvedExpression>> optionalResolvedExpressions =
+ DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+ verifyExpression(optionalResolvedExpressions, "[]");
+
+ tableModify = getTableModifyFromSql("DELETE FROM t where f0 = 1 and f1 = '123'");
+ optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+ verifyExpression(optionalResolvedExpressions, "[equals(f0, 1), equals(f1, '123')]");
+
+ tableModify = getTableModifyFromSql("DELETE FROM t where f0 = 1 + 6 and f0 < 6");
+ optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+ assertThat(optionalResolvedExpressions).isPresent();
+ verifyExpression(optionalResolvedExpressions, "[false]");
+
+ tableModify = getTableModifyFromSql("DELETE FROM t where f0 = f2 + 1");
+ optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+ verifyExpression(
+ optionalResolvedExpressions, "[equals(cast(f0, BIGINT NOT NULL), plus(f2, 1))]");
+
+ // resolve filters is not available as it contains sub-query
+ tableModify = getTableModifyFromSql("DELETE FROM t where f0 > (select count(1) from t)");
+ optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+ assertThat(optionalResolvedExpressions).isEmpty();
+ }
+
+ private CatalogTable createTestCatalogTable(Map<String, String> options) {
+ return CatalogTable.of(
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT().notNull())
+ .column("f1", DataTypes.STRING().nullable())
+ .column("f2", DataTypes.BIGINT().nullable())
+ .build(),
+ null,
+ Collections.emptyList(),
+ options);
+ }
+
+ private LogicalTableModify getTableModifyFromSql(String sql) {
+ SqlNode sqlNode = parser.parse(sql);
+ final SqlNode validated = flinkPlanner.validate(sqlNode);
+ RelRoot deleteRelational = flinkPlanner.rel(validated);
+ return (LogicalTableModify) deleteRelational.rel;
+ }
+
+ @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ private void verifyExpression(
+ Optional<List<ResolvedExpression>> optionalResolvedExpressions, String expected) {
+ assertThat(optionalResolvedExpressions).isPresent();
+ assertThat(optionalResolvedExpressions.get().toString()).isEqualTo(expected);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 0197ce8d432..129b969b9a0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -52,9 +52,11 @@ import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.SqlCallExpression;
import org.apache.flink.table.factories.TestManagedTableFactory;
import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.DeleteFromFilterOperation;
import org.apache.flink.table.operations.EndStatementSetOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
@@ -96,6 +98,7 @@ import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.expressions.utils.Func0$;
import org.apache.flink.table.planner.expressions.utils.Func1$;
import org.apache.flink.table.planner.expressions.utils.Func8$;
+import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.parse.ExtendedParser;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
@@ -2706,6 +2709,46 @@ public class SqlToOperationConverterTest {
assertThat(extendedParser.parse(command)).get().isInstanceOf(QuitOperation.class);
}
+ @Test
+ public void testDeletePushDown() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER);
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("c", DataTypes.STRING().notNull())
+ .build(),
+ null,
+ Collections.emptyList(),
+ options);
+ ObjectIdentifier tableIdentifier =
+ ObjectIdentifier.of("builtin", "default", "test_push_down");
+ catalogManager.createTable(catalogTable, tableIdentifier, false);
+
+ // no filter in delete statement
+ Operation operation = parse("DELETE FROM test_push_down");
+ checkDeleteFromFilterOperation(operation, "[]");
+
+ // with filters in delete statement
+ operation = parse("DELETE FROM test_push_down where a = 1 and c = '123'");
+ checkDeleteFromFilterOperation(operation, "[equals(a, 1), equals(c, '123')]");
+
+ // with filter = false after reduced in delete statement
+ operation = parse("DELETE FROM test_push_down where a = 1 + 6 and a = 2");
+ checkDeleteFromFilterOperation(operation, "[false]");
+
+ assertThatThrownBy(
+ () ->
+ parse(
+ "DELETE FROM test_push_down where a = (select count(*) from test_push_down)"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ String.format(
+ "Only delete push down is supported currently, but the delete statement can't pushed to table sink %s.",
+ tableIdentifier.asSerializableString()));
+ }
+
// ~ Tool Methods ----------------------------------------------------------
private static TestItem createTestItem(Object... args) {
@@ -2902,6 +2945,15 @@ public class SqlToOperationConverterTest {
TestManagedTableFactory.ENRICHED_VALUE);
}
+ private static void checkDeleteFromFilterOperation(
+ Operation operation, String expectedFilters) {
+ assertThat(operation).isInstanceOf(DeleteFromFilterOperation.class);
+ DeleteFromFilterOperation deleteFromFiltersOperation =
+ (DeleteFromFilterOperation) operation;
+ List<ResolvedExpression> filters = deleteFromFiltersOperation.getFilters();
+ assertThat(filters.toString()).isEqualTo(expectedFilters);
+ }
+
// ~ Inner Classes ----------------------------------------------------------
private static class TestItem {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
new file mode 100644
index 00000000000..8a60d24dfbe
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The IT case for DELETE statement in batch mode. */
+public class DeleteTableITCase extends BatchTestBase {
+ private static final int ROW_NUM = 5;
+
+ @Test
+ public void testDeletePushDown() throws Exception {
+ String dataId = registerData();
+ tEnv().executeSql(
+ String.format(
+ "CREATE TABLE t (a int, b string, c double) WITH"
+ + " ('connector' = 'test-update-delete',"
+ + " 'data-id' = '%s',"
+ + " 'only-accept-equal-predicate' = 'true'"
+ + ")",
+ dataId));
+ // it only contains equal expression, should be pushed down
+ List<Row> rows =
+ CollectionUtil.iteratorToList(
+ tEnv().executeSql("DELETE FROM t WHERE a = 1").collect());
+ assertThat(rows.toString()).isEqualTo("[+I[1], +I[OK]]");
+ rows = CollectionUtil.iteratorToList(tEnv().executeSql("SELECT * FROM t").collect());
+ assertThat(rows.toString())
+ .isEqualTo("[+I[0, b_0, 0.0], +I[2, b_2, 4.0], +I[3, b_3, 6.0], +I[4, b_4, 8.0]]");
+
+ // should throw exception for the deletion can not be pushed down as it contains non-equal
+ // predicate and the table sink haven't implemented SupportsRowLevelDeleteInterface
+ assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t where a > 1"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ "Only delete push down is supported currently, "
+ + "but the delete statement can't pushed to table sink `default_catalog`.`default_database`.`t`.");
+
+ tEnv().executeSql(
+ "CREATE TABLE t1 (a int) WITH"
+ + " ('connector' = 'test-update-delete',"
+ + " 'support-delete-push-down' = 'false')");
+ // should throw exception for sink that doesn't implement SupportsDeletePushDown interface
+ assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t1"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ "Only delete push down is supported currently, "
+ + "but the delete statement can't pushed to table sink `default_catalog`.`default_database`.`t1`.");
+ }
+
+ private String registerData() {
+ List<RowData> values = createValue();
+ return TestUpdateDeleteTableFactory.registerRowData(values);
+ }
+
+ private List<RowData> createValue() {
+ List<RowData> values = new ArrayList<>();
+ for (int i = 0; i < ROW_NUM; i++) {
+ values.add(GenericRowData.of(i, StringData.fromString("b_" + i), i * 2.0));
+ }
+ return values;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java
new file mode 100644
index 00000000000..e6ba79ef2b4
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The IT case for DELETE statement in streaming mode. */
+public class DeleteTableITCase extends StreamingTestBase {
+
+ @Test
+ public void testDelete() {
+ tEnv().executeSql("CREATE TABLE t (a int) WITH ('connector' = 'test-update-delete')");
+ assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t"))
+ .isInstanceOf(TableException.class)
+ .hasMessage("DELETE statement is not supported for streaming mode now.");
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 7386413a5d2..bfcf0178f2c 100644
--- a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -18,3 +18,4 @@ org.apache.flink.table.planner.factories.TestValuesTableFactory
org.apache.flink.table.planner.factories.TestFileFactory
org.apache.flink.table.planner.factories.TableFactoryHarness$Factory
org.apache.flink.table.planner.plan.stream.sql.TestTableFactory
+org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory