You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/06/09 02:11:15 UTC
[flink] branch master updated: [FLINK-24865][CEP] Support MATCH_RECOGNIZE in Batch mode
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7f7bee70e3a [FLINK-24865][CEP] Support MATCH_RECOGNIZE in Batch mode
7f7bee70e3a is described below
commit 7f7bee70e3ac0d9fb27d7e09b41d6396b748dada
Author: SteNicholas <pr...@163.com>
AuthorDate: Wed Jun 8 14:19:58 2022 +0800
[FLINK-24865][CEP] Support MATCH_RECOGNIZE in Batch mode
This closes #18408
---
.../plan/nodes/exec/batch/BatchExecMatch.java | 57 ++
.../CommonExecMatch.java} | 124 +---
.../plan/nodes/exec/stream/StreamExecMatch.java | 387 +------------
.../nodes/physical/batch/BatchPhysicalMatch.java | 65 +++
.../nodes/physical/common/CommonPhysicalMatch.java | 101 ++++
.../physical/batch/BatchPhysicalMatchRule.java | 63 ++
.../physical/common/CommonPhysicalMatchRule.java | 176 ++++++
.../physical/stream/StreamPhysicalMatch.scala | 50 +-
.../planner/plan/rules/FlinkBatchRuleSets.scala | 3 +
.../physical/stream/StreamPhysicalMatchRule.scala | 132 +----
.../validation/MatchRecognizeValidationTest.java | 354 +++++++++++
.../planner/plan/batch/sql/MatchRecognizeTest.java | 87 +++
.../nodes/exec/operator/BatchOperatorNameTest.java | 20 +
.../runtime/batch/sql/MatchRecognizeITCase.java | 645 +++++++++++++++++++++
.../planner/plan/batch/sql/MatchRecognizeTest.xml | 74 +++
.../nodes/exec/operator/BatchOperatorNameTest.xml | 108 ++++
.../validation/MatchRecognizeValidationTest.scala | 347 -----------
.../planner/match/PatternTranslatorTestBase.scala | 6 +-
18 files changed, 1802 insertions(+), 997 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
new file mode 100644
index 00000000000..4f02be5f91c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+
+/** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
+public class BatchExecMatch extends CommonExecMatch
+ implements BatchExecNode<RowData>, MultipleTransformationTranslator<RowData> {
+
+ public BatchExecMatch(
+ ReadableConfig tableConfig,
+ MatchSpec matchSpec,
+ InputProperty inputProperty,
+ RowType outputType,
+ String description) {
+ super(
+ ExecNodeContext.newNodeId(),
+ ExecNodeContext.newContext(BatchExecMatch.class),
+ ExecNodeContext.newPersistedConfig(BatchExecMatch.class, tableConfig),
+ matchSpec,
+ Collections.singletonList(inputProperty),
+ outputType,
+ description);
+ }
+
+ @Override
+ public boolean isProcTime(RowType inputRowType) {
+ return true;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
similarity index 78%
copy from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
index 6d1dbad480f..d961b5a4c96 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
-import org.apache.flink.FlinkVersion;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
@@ -46,7 +45,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
@@ -59,16 +57,11 @@ import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
-import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.MathUtils;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
@@ -79,61 +72,29 @@ import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.tools.RelBuilder;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
-import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
-/** Stream {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
-@ExecNodeMetadata(
- name = "stream-exec-match",
- version = 1,
- producedTransformations = {
- StreamExecMatch.TIMESTAMP_INSERTER_TRANSFORMATION,
- StreamExecMatch.MATCH_TRANSFORMATION
- },
- minPlanVersion = FlinkVersion.v1_15,
- minStateVersion = FlinkVersion.v1_15)
-public class StreamExecMatch extends ExecNodeBase<RowData>
- implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> {
-
- public static final String TIMESTAMP_INSERTER_TRANSFORMATION = "timestamp-inserter";
- public static final String MATCH_TRANSFORMATION = "match";
+/** Common {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
+public abstract class CommonExecMatch extends ExecNodeBase<RowData>
+ implements ExecNode<RowData>, MultipleTransformationTranslator<RowData> {
- public static final String FIELD_NAME_MATCH_SPEC = "matchSpec";
+ public static final String MATCH_TRANSFORMATION = "match";
- @JsonProperty(FIELD_NAME_MATCH_SPEC)
private final MatchSpec matchSpec;
- public StreamExecMatch(
- ReadableConfig tableConfig,
+ public CommonExecMatch(
+ int id,
+ ExecNodeContext context,
+ ReadableConfig persistedConfig,
MatchSpec matchSpec,
- InputProperty inputProperty,
- RowType outputType,
+ List<InputProperty> inputProperties,
+ LogicalType outputType,
String description) {
- this(
- ExecNodeContext.newNodeId(),
- ExecNodeContext.newContext(StreamExecMatch.class),
- ExecNodeContext.newPersistedConfig(StreamExecMatch.class, tableConfig),
- matchSpec,
- Collections.singletonList(inputProperty),
- outputType,
- description);
- }
-
- @JsonCreator
- public StreamExecMatch(
- @JsonProperty(FIELD_NAME_ID) int id,
- @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
- @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
- @JsonProperty(FIELD_NAME_MATCH_SPEC) MatchSpec matchSpec,
- @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
- @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
- @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, context, persistedConfig, inputProperties, outputType, description);
checkArgument(inputProperties.size() == 1);
this.matchSpec = checkNotNull(matchSpec);
@@ -182,11 +143,6 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
}
final int[] partitionKeys = matchSpec.getPartition().getFieldIndices();
- final SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
- final LogicalType timeOrderFieldType =
- inputRowType.getTypeAt(timeOrderField.getFieldIndex());
-
- final boolean isProctime = TypeCheckUtils.isProcTime(timeOrderFieldType);
final InternalTypeInfo<RowData> inputTypeInfo =
(InternalTypeInfo<RowData>) inputTransform.getOutputType();
final TypeSerializer<RowData> inputSerializer =
@@ -212,7 +168,7 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
final CepOperator<RowData, RowData, RowData> operator =
new CepOperator<>(
inputSerializer,
- isProctime,
+ isProcTime(inputRowType),
nfaFactory,
eventComparator,
cepPattern.getAfterMatchSkipStrategy(),
@@ -238,28 +194,7 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
return transform;
}
- private void checkOrderKeys(RowType inputRowType) {
- SortSpec orderKeys = matchSpec.getOrderKeys();
- if (orderKeys.getFieldSize() == 0) {
- throw new TableException("You must specify either rowtime or proctime for order by.");
- }
-
- SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0);
- int timeOrderFieldIdx = timeOrderField.getFieldIndex();
- LogicalType timeOrderFieldType = inputRowType.getTypeAt(timeOrderFieldIdx);
- // need to identify time between others order fields. Time needs to be first sort element
- if (!TypeCheckUtils.isRowTime(timeOrderFieldType)
- && !TypeCheckUtils.isProcTime(timeOrderFieldType)) {
- throw new TableException(
- "You must specify either rowtime or proctime for order by as the first one.");
- }
-
- // time ordering needs to be ascending
- if (!orderKeys.getAscendingOrders()[0]) {
- throw new TableException(
- "Primary sort order of a streaming table must be ascending on time.");
- }
- }
+ protected void checkOrderKeys(RowType inputRowType) {}
private EventComparator<RowData> createEventComparator(
ExecNodeConfig config, ClassLoader classLoader, RowType inputRowType) {
@@ -274,36 +209,9 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
}
}
- private Transformation<RowData> translateOrder(
+ protected Transformation<RowData> translateOrder(
Transformation<RowData> inputTransform, RowType inputRowType, ReadableConfig config) {
- SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
- int timeOrderFieldIdx = timeOrderField.getFieldIndex();
- LogicalType timeOrderFieldType = inputRowType.getTypeAt(timeOrderFieldIdx);
-
- if (TypeCheckUtils.isRowTime(timeOrderFieldType)) {
- // copy the rowtime field into the StreamRecord timestamp field
- int precision = getPrecision(timeOrderFieldType);
- Transformation<RowData> transform =
- ExecNodeUtil.createOneInputTransformation(
- inputTransform,
- createTransformationMeta(
- TIMESTAMP_INSERTER_TRANSFORMATION,
- String.format(
- "StreamRecordTimestampInserter(rowtime field: %s)",
- timeOrderFieldIdx),
- "StreamRecordTimestampInserter",
- config),
- new StreamRecordTimestampInserter(timeOrderFieldIdx, precision),
- inputTransform.getOutputType(),
- inputTransform.getParallelism());
- if (inputsContainSingleton()) {
- transform.setParallelism(1);
- transform.setMaxParallelism(1);
- }
- return transform;
- } else {
- return inputTransform;
- }
+ return inputTransform;
}
@VisibleForTesting
@@ -337,6 +245,8 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
"Only constant intervals with millisecond resolution are supported as time constraints of patterns.");
}
+ public abstract boolean isProcTime(RowType inputRowType);
+
/** The visitor to traverse the pattern RexNode. */
private static class PatternVisitor extends RexDefaultVisitor<Pattern<RowData, RowData>> {
private final ReadableConfig config;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
index 6d1dbad480f..6bc018528d8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
@@ -19,74 +19,31 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.flink.FlinkVersion;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.operator.CepOperator;
-import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.cep.pattern.Quantifier;
-import org.apache.flink.cep.pattern.conditions.BooleanConditions;
-import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.codegen.CodeGenUtils;
-import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
-import org.apache.flink.table.planner.codegen.MatchCodeGenerator;
-import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
-import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch;
import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
-import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
-import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
-import org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
-import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.MathUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlMatchRecognize;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.type.SqlTypeFamily;
-import org.apache.calcite.tools.RelBuilder;
-
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Optional;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
/** Stream {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
@ExecNodeMetadata(
@@ -98,11 +55,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
},
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
-public class StreamExecMatch extends ExecNodeBase<RowData>
+public class StreamExecMatch extends CommonExecMatch
implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> {
public static final String TIMESTAMP_INSERTER_TRANSFORMATION = "timestamp-inserter";
- public static final String MATCH_TRANSFORMATION = "match";
public static final String FIELD_NAME_MATCH_SPEC = "matchSpec";
@@ -134,111 +90,12 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
- super(id, context, persistedConfig, inputProperties, outputType, description);
- checkArgument(inputProperties.size() == 1);
- this.matchSpec = checkNotNull(matchSpec);
+ super(id, context, persistedConfig, matchSpec, inputProperties, outputType, description);
+ this.matchSpec = matchSpec;
}
- @SuppressWarnings("unchecked")
@Override
- protected Transformation<RowData> translateToPlanInternal(
- PlannerBase planner, ExecNodeConfig config) {
- final ExecEdge inputEdge = getInputEdges().get(0);
- final Transformation<RowData> inputTransform =
- (Transformation<RowData>) inputEdge.translateToPlan(planner);
- final RowType inputRowType = (RowType) inputEdge.getOutputType();
-
- checkOrderKeys(inputRowType);
- final EventComparator<RowData> eventComparator =
- createEventComparator(
- config, planner.getFlinkContext().getClassLoader(), inputRowType);
- final Transformation<RowData> timestampedInputTransform =
- translateOrder(inputTransform, inputRowType, config);
-
- final Tuple2<Pattern<RowData, RowData>, List<String>> cepPatternAndNames =
- translatePattern(
- matchSpec,
- config,
- planner.getFlinkContext().getClassLoader(),
- planner.createRelBuilder(),
- inputRowType);
- final Pattern<RowData, RowData> cepPattern = cepPatternAndNames.f0;
-
- // TODO remove this once it is supported in CEP library
- if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
- throw new TableException(
- "Patterns that can produce empty matches are not supported. There must be at least one non-optional state.");
- }
-
- // TODO remove this once it is supported in CEP library
- if (cepPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
- throw new TableException(
- "Greedy quantifiers are not allowed as the last element of a Pattern yet. "
- + "Finish your pattern with either a simple variable or reluctant quantifier.");
- }
-
- if (matchSpec.isAllRows()) {
- throw new TableException("All rows per match mode is not supported yet.");
- }
-
- final int[] partitionKeys = matchSpec.getPartition().getFieldIndices();
- final SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
- final LogicalType timeOrderFieldType =
- inputRowType.getTypeAt(timeOrderField.getFieldIndex());
-
- final boolean isProctime = TypeCheckUtils.isProcTime(timeOrderFieldType);
- final InternalTypeInfo<RowData> inputTypeInfo =
- (InternalTypeInfo<RowData>) inputTransform.getOutputType();
- final TypeSerializer<RowData> inputSerializer =
- inputTypeInfo.createSerializer(planner.getExecEnv().getConfig());
- final NFACompiler.NFAFactory<RowData> nfaFactory =
- NFACompiler.compileFactory(cepPattern, false);
- final MatchCodeGenerator generator =
- new MatchCodeGenerator(
- new CodeGeneratorContext(
- config, planner.getFlinkContext().getClassLoader()),
- planner.createRelBuilder(),
- false, // nullableInput
- JavaScalaConversionUtil.toScala(cepPatternAndNames.f1),
- JavaScalaConversionUtil.toScala(Optional.empty()),
- CodeGenUtils.DEFAULT_COLLECTOR_TERM());
- generator.bindInput(
- inputRowType,
- CodeGenUtils.DEFAULT_INPUT1_TERM(),
- JavaScalaConversionUtil.toScala(Optional.empty()));
- final PatternProcessFunctionRunner patternProcessFunction =
- generator.generateOneRowPerMatchExpression(
- (RowType) getOutputType(), partitionKeys, matchSpec.getMeasures());
- final CepOperator<RowData, RowData, RowData> operator =
- new CepOperator<>(
- inputSerializer,
- isProctime,
- nfaFactory,
- eventComparator,
- cepPattern.getAfterMatchSkipStrategy(),
- patternProcessFunction,
- null);
- final OneInputTransformation<RowData, RowData> transform =
- ExecNodeUtil.createOneInputTransformation(
- timestampedInputTransform,
- createTransformationMeta(MATCH_TRANSFORMATION, config),
- operator,
- InternalTypeInfo.of(getOutputType()),
- timestampedInputTransform.getParallelism());
- final RowDataKeySelector selector =
- KeySelectorUtil.getRowDataSelector(
- planner.getFlinkContext().getClassLoader(), partitionKeys, inputTypeInfo);
- transform.setStateKeySelector(selector);
- transform.setStateKeyType(selector.getProducedType());
-
- if (inputsContainSingleton()) {
- transform.setParallelism(1);
- transform.setMaxParallelism(1);
- }
- return transform;
- }
-
- private void checkOrderKeys(RowType inputRowType) {
+ public void checkOrderKeys(RowType inputRowType) {
SortSpec orderKeys = matchSpec.getOrderKeys();
if (orderKeys.getFieldSize() == 0) {
throw new TableException("You must specify either rowtime or proctime for order by.");
@@ -261,20 +118,8 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
}
}
- private EventComparator<RowData> createEventComparator(
- ExecNodeConfig config, ClassLoader classLoader, RowType inputRowType) {
- SortSpec orderKeys = matchSpec.getOrderKeys();
- if (orderKeys.getFieldIndices().length > 1) {
- GeneratedRecordComparator rowComparator =
- ComparatorCodeGenerator.gen(
- config, classLoader, "RowDataComparator", inputRowType, orderKeys);
- return new RowDataEventComparator(rowComparator);
- } else {
- return null;
- }
- }
-
- private Transformation<RowData> translateOrder(
+ @Override
+ public Transformation<RowData> translateOrder(
Transformation<RowData> inputTransform, RowType inputRowType, ReadableConfig config) {
SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
int timeOrderFieldIdx = timeOrderField.getFieldIndex();
@@ -306,217 +151,11 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
}
}
- @VisibleForTesting
- public static Tuple2<Pattern<RowData, RowData>, List<String>> translatePattern(
- MatchSpec matchSpec,
- ReadableConfig config,
- ClassLoader classLoader,
- RelBuilder relBuilder,
- RowType inputRowType) {
- final PatternVisitor patternVisitor =
- new PatternVisitor(config, classLoader, relBuilder, inputRowType, matchSpec);
-
- final Pattern<RowData, RowData> cepPattern;
- if (matchSpec.getInterval().isPresent()) {
- Time interval = translateTimeBound(matchSpec.getInterval().get());
- cepPattern = matchSpec.getPattern().accept(patternVisitor).within(interval);
- } else {
- cepPattern = matchSpec.getPattern().accept(patternVisitor);
- }
- return new Tuple2<>(cepPattern, new ArrayList<>(patternVisitor.names));
- }
-
- private static Time translateTimeBound(RexNode interval) {
- if (interval instanceof RexLiteral) {
- final RexLiteral l = (RexLiteral) interval;
- if (l.getTypeName().getFamily() == SqlTypeFamily.INTERVAL_DAY_TIME) {
- return Time.milliseconds(l.getValueAs(Long.class));
- }
- }
- throw new TableException(
- "Only constant intervals with millisecond resolution are supported as time constraints of patterns.");
- }
-
- /** The visitor to traverse the pattern RexNode. */
- private static class PatternVisitor extends RexDefaultVisitor<Pattern<RowData, RowData>> {
- private final ReadableConfig config;
- private final ClassLoader classLoader;
- private final RelBuilder relBuilder;
- private final RowType inputRowType;
- private final MatchSpec matchSpec;
- private final LinkedHashSet<String> names;
- private Pattern<RowData, RowData> pattern;
-
- public PatternVisitor(
- ReadableConfig config,
- ClassLoader classLoader,
- RelBuilder relBuilder,
- RowType inputRowType,
- MatchSpec matchSpec) {
- this.config = config;
- this.classLoader = classLoader;
- this.relBuilder = relBuilder;
- this.inputRowType = inputRowType;
- this.matchSpec = matchSpec;
- this.names = new LinkedHashSet<>();
- }
-
- @Override
- public Pattern<RowData, RowData> visitLiteral(RexLiteral literal) {
- String patternName = literal.getValueAs(String.class);
- pattern = translateSingleVariable(pattern, patternName);
-
- RexNode patternDefinition = matchSpec.getPatternDefinitions().get(patternName);
- if (patternDefinition != null) {
- MatchCodeGenerator generator =
- new MatchCodeGenerator(
- new CodeGeneratorContext(config, classLoader),
- relBuilder,
- false, // nullableInput
- JavaScalaConversionUtil.toScala(new ArrayList<>(names)),
- JavaScalaConversionUtil.toScala(Optional.of(patternName)),
- CodeGenUtils.DEFAULT_COLLECTOR_TERM());
- generator.bindInput(
- inputRowType,
- CodeGenUtils.DEFAULT_INPUT1_TERM(),
- JavaScalaConversionUtil.toScala(Optional.empty()));
- IterativeCondition<RowData> condition =
- generator.generateIterativeCondition(patternDefinition);
- return pattern.where(condition);
- } else {
- return pattern.where(BooleanConditions.trueFunction());
- }
- }
-
- @Override
- public Pattern<RowData, RowData> visitCall(RexCall call) {
- SqlOperator operator = call.getOperator();
- if (operator == SqlStdOperatorTable.PATTERN_CONCAT) {
- pattern = call.operands.get(0).accept(this);
- pattern = call.operands.get(1).accept(this);
- return pattern;
- } else if (operator == SqlStdOperatorTable.PATTERN_QUANTIFIER) {
- final RexLiteral name;
- if (call.operands.get(0) instanceof RexLiteral) {
- name = (RexLiteral) call.operands.get(0);
- } else {
- throw new TableException(
- String.format(
- "Expression not supported: %s Group patterns are not supported yet.",
- call.operands.get(0)));
- }
-
- pattern = name.accept(this);
- int startNum =
- MathUtils.checkedDownCast(
- ((RexLiteral) call.operands.get(1)).getValueAs(Long.class));
- int endNum =
- MathUtils.checkedDownCast(
- ((RexLiteral) call.operands.get(2)).getValueAs(Long.class));
- boolean isGreedy = !((RexLiteral) call.operands.get(3)).getValueAs(Boolean.class);
-
- return applyQuantifier(pattern, startNum, endNum, isGreedy);
- } else if (operator == SqlStdOperatorTable.PATTERN_ALTER) {
- throw new TableException(
- String.format(
- "Expression not supported: %s. Currently, CEP doesn't support branching patterns.",
- call));
- } else if (operator == SqlStdOperatorTable.PATTERN_PERMUTE) {
- throw new TableException(
- String.format(
- "Expression not supported: %s. Currently, CEP doesn't support PERMUTE patterns.",
- call));
- } else if (operator == SqlStdOperatorTable.PATTERN_EXCLUDE) {
- throw new TableException(
- String.format(
- "Expression not supported: %s. Currently, CEP doesn't support '{-' '-}' patterns.",
- call));
- } else {
- throw new TableException("This should not happen.");
- }
- }
-
- @Override
- public Pattern<RowData, RowData> visitNode(RexNode rexNode) {
- throw new TableException(
- String.format("Unsupported expression within Pattern: [%s]", rexNode));
- }
-
- private Pattern<RowData, RowData> translateSingleVariable(
- Pattern<RowData, RowData> previousPattern, String patternName) {
- if (names.contains(patternName)) {
- throw new TableException(
- "Pattern variables must be unique. That might change in the future.");
- } else {
- names.add(patternName);
- }
-
- if (previousPattern != null) {
- return previousPattern.next(patternName);
- } else {
- return Pattern.begin(patternName, translateSkipStrategy());
- }
- }
-
- private AfterMatchSkipStrategy translateSkipStrategy() {
- switch (matchSpec.getAfter().getKind()) {
- case LITERAL:
- SqlMatchRecognize.AfterOption afterOption =
- ((RexLiteral) matchSpec.getAfter())
- .getValueAs(SqlMatchRecognize.AfterOption.class);
- switch (afterOption) {
- case SKIP_PAST_LAST_ROW:
- return AfterMatchSkipStrategy.skipPastLastEvent();
- case SKIP_TO_NEXT_ROW:
- return AfterMatchSkipStrategy.skipToNext();
- default:
- throw new TableException("This should not happen.");
- }
- case SKIP_TO_FIRST:
- return AfterMatchSkipStrategy.skipToFirst(getPatternTarget())
- .throwExceptionOnMiss();
- case SKIP_TO_LAST:
- return AfterMatchSkipStrategy.skipToLast(getPatternTarget())
- .throwExceptionOnMiss();
- default:
- throw new TableException(
- String.format(
- "Corrupted query tree. Unexpected %s for after match strategy.",
- matchSpec.getAfter()));
- }
- }
-
- private String getPatternTarget() {
- return ((RexLiteral) ((RexCall) matchSpec.getAfter()).getOperands().get(0))
- .getValueAs(String.class);
- }
-
- private Pattern<RowData, RowData> applyQuantifier(
- Pattern<RowData, RowData> pattern, int startNum, int endNum, boolean greedy) {
- boolean isOptional = startNum == 0 && endNum == 1;
-
- final Pattern<RowData, RowData> newPattern;
- if (startNum == 0 && endNum == -1) { // zero or more
- newPattern = pattern.oneOrMore().optional().consecutive();
- } else if (startNum == 1 && endNum == -1) { // one or more
- newPattern = pattern.oneOrMore().consecutive();
- } else if (isOptional) { // optional
- newPattern = pattern.optional();
- } else if (endNum != -1) { // times
- newPattern = pattern.times(startNum, endNum).consecutive();
- } else { // times or more
- newPattern = pattern.timesOrMore(startNum).consecutive();
- }
-
- if (greedy && (isOptional || startNum == endNum)) {
- return newPattern;
- } else if (greedy) {
- return newPattern.greedy();
- } else if (isOptional) {
- throw new TableException("Reluctant optional variables are not supported yet.");
- } else {
- return newPattern;
- }
- }
+ @Override
+ public boolean isProcTime(RowType inputRowType) {
+ final SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
+ final LogicalType timeOrderFieldType =
+ inputRowType.getTypeAt(timeOrderField.getFieldIndex());
+ return TypeCheckUtils.isProcTime(timeOrderFieldType);
}
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java
new file mode 100644
index 00000000000..2f57564ef8a
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.logical.MatchRecognize;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMatch;
+import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalMatch;
+import org.apache.flink.table.planner.plan.utils.MatchUtil;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.List;
+
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
+
+/** Batch physical RelNode which matches along with MATCH_RECOGNIZE. */
+public class BatchPhysicalMatch extends CommonPhysicalMatch implements BatchPhysicalRel {
+
+ public BatchPhysicalMatch(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode inputNode,
+ MatchRecognize logicalMatch,
+ RelDataType outputRowType) {
+ super(cluster, traitSet, inputNode, logicalMatch, outputRowType);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new BatchPhysicalMatch(
+ getCluster(), traitSet, inputs.get(0), getLogicalMatch(), deriveRowType());
+ }
+
+ @Override
+ public ExecNode<?> translateToExecNode() {
+ return new BatchExecMatch(
+ unwrapTableConfig(this),
+ MatchUtil.createMatchSpec(getLogicalMatch()),
+ InputProperty.DEFAULT,
+ FlinkTypeFactory.toLogicalRowType(getRowType()),
+ getRelDetailedDescription());
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalMatch.java
new file mode 100644
index 00000000000..fc95986df40
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalMatch.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.common;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.plan.logical.MatchRecognize;
+import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.type.RelDataType;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+/** Base physical RelNode which matches along with MATCH_RECOGNIZE. */
+public abstract class CommonPhysicalMatch extends SingleRel implements FlinkPhysicalRel {
+
+ private final MatchRecognize logicalMatch;
+ private final RelDataType outputRowType;
+
+ public CommonPhysicalMatch(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode inputNode,
+ MatchRecognize logicalMatch,
+ RelDataType outputRowType) {
+ super(cluster, traitSet, inputNode);
+ if (logicalMatch.measures().values().stream()
+ .anyMatch(m -> PythonUtil.containsPythonCall(m, null))
+ || logicalMatch.patternDefinitions().values().stream()
+ .anyMatch(p -> PythonUtil.containsPythonCall(p, null))) {
+ throw new TableException("Python Function can not be used in MATCH_RECOGNIZE for now.");
+ }
+ this.logicalMatch = logicalMatch;
+ this.outputRowType = outputRowType;
+ }
+
+ @Override
+ protected RelDataType deriveRowType() {
+ return outputRowType;
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ RelDataType inputRowType = getInput().getRowType();
+ Seq<String> fieldNames =
+ JavaConverters.asScalaBufferConverter(inputRowType.getFieldNames()).asScala();
+ return super.explainTerms(pw)
+ .itemIf(
+ "partitionBy",
+ RelExplainUtil.fieldToString(
+ logicalMatch.partitionKeys().toArray(), inputRowType),
+ !logicalMatch.partitionKeys().isEmpty())
+ .itemIf(
+ "orderBy",
+ RelExplainUtil.collationToString(logicalMatch.orderKeys(), inputRowType),
+ !logicalMatch.orderKeys().getFieldCollations().isEmpty())
+ .itemIf(
+ "measures",
+ RelExplainUtil.measuresDefineToString(
+ logicalMatch.measures(),
+ fieldNames.toList(),
+ this::getExpressionString,
+ convertToExpressionDetail(pw.getDetailLevel())),
+ !logicalMatch.measures().isEmpty())
+ .item("rowsPerMatch", RelExplainUtil.rowsPerMatchToString(logicalMatch.allRows()))
+ .item("after", RelExplainUtil.afterMatchToString(logicalMatch.after(), fieldNames))
+ .item("pattern", logicalMatch.pattern().toString())
+ .itemIf(
+ "subset",
+ RelExplainUtil.subsetToString(logicalMatch.subsets()),
+ !logicalMatch.subsets().isEmpty())
+ .item("define", logicalMatch.patternDefinitions());
+ }
+
+ public MatchRecognize getLogicalMatch() {
+ return logicalMatch;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.java
new file mode 100644
index 00000000000..84a74407e44
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.logical.MatchRecognize;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalMatch;
+import org.apache.flink.table.planner.plan.rules.physical.common.CommonPhysicalMatchRule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalMatch} to {@link
+ * BatchPhysicalMatch}.
+ */
+public class BatchPhysicalMatchRule extends CommonPhysicalMatchRule {
+
+ public static final RelOptRule INSTANCE = new BatchPhysicalMatchRule();
+
+ private BatchPhysicalMatchRule() {
+ super(
+ FlinkLogicalMatch.class,
+ FlinkConventions.LOGICAL(),
+ FlinkConventions.BATCH_PHYSICAL(),
+ "BatchPhysicalMatchRule");
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ return super.convert(rel, FlinkConventions.BATCH_PHYSICAL());
+ }
+
+ @Override
+ protected RelNode convertToPhysicalMatch(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode convertInput,
+ MatchRecognize matchRecognize,
+ RelDataType rowType) {
+ return new BatchPhysicalMatch(cluster, traitSet, convertInput, matchRecognize, rowType);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
new file mode 100644
index 00000000000..397476b0ebb
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.common;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.plan.logical.MatchRecognize;
+import org.apache.flink.table.planner.plan.nodes.FlinkConvention;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.MatchUtil.AggregationPatternVariableFinder;
+import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The physical rule is responsible for converting {@link FlinkLogicalMatch} to physical Match rel.
+ */
+public abstract class CommonPhysicalMatchRule extends ConverterRule {
+
+ public CommonPhysicalMatchRule(
+ Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String descriptionPrefix) {
+ super(clazz, in, out, descriptionPrefix);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ FlinkLogicalMatch logicalMatch = call.rel(0);
+
+ validateAggregations(logicalMatch.getMeasures().values());
+ validateAggregations(logicalMatch.getPatternDefinitions().values());
+ // This check might be obsolete once CALCITE-2747 is resolved
+ validateAmbiguousColumns(logicalMatch);
+ return true;
+ }
+
+ public RelNode convert(RelNode rel, FlinkConvention convention) {
+ FlinkLogicalMatch logicalMatch = (FlinkLogicalMatch) rel;
+ RelTraitSet traitSet = rel.getTraitSet().replace(convention);
+ ImmutableBitSet partitionKeys = logicalMatch.getPartitionKeys();
+
+ FlinkRelDistribution requiredDistribution =
+ partitionKeys.isEmpty()
+ ? FlinkRelDistribution.SINGLETON()
+ : FlinkRelDistribution.hash(logicalMatch.getPartitionKeys().asList(), true);
+ RelTraitSet requiredTraitSet =
+ rel.getCluster()
+ .getPlanner()
+ .emptyTraitSet()
+ .replace(requiredDistribution)
+ .replace(convention);
+
+ RelNode convertInput = RelOptRule.convert(logicalMatch.getInput(), requiredTraitSet);
+
+ try {
+ Class.forName(
+ "org.apache.flink.cep.pattern.Pattern",
+ false,
+ Thread.currentThread().getContextClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new TableException(
+ "MATCH RECOGNIZE clause requires flink-cep dependency to be present on the classpath.",
+ e);
+ }
+ return convertToPhysicalMatch(
+ rel.getCluster(),
+ traitSet,
+ convertInput,
+ new MatchRecognize(
+ logicalMatch.getPattern(),
+ logicalMatch.getPatternDefinitions(),
+ logicalMatch.getMeasures(),
+ logicalMatch.getAfter(),
+ logicalMatch.getSubsets(),
+ logicalMatch.isAllRows(),
+ logicalMatch.getPartitionKeys(),
+ logicalMatch.getOrderKeys(),
+ logicalMatch.getInterval()),
+ logicalMatch.getRowType());
+ }
+
+ protected abstract RelNode convertToPhysicalMatch(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode convertInput,
+ MatchRecognize matchRecognize,
+ RelDataType rowType);
+
+ private void validateAggregations(Iterable<RexNode> expr) {
+ AggregationsValidator validator = new AggregationsValidator();
+ expr.forEach(e -> e.accept(validator));
+ }
+
+ private void validateAmbiguousColumns(FlinkLogicalMatch logicalMatch) {
+ if (logicalMatch.isAllRows()) {
+ throw new TableException("All rows per match mode is not supported yet.");
+ } else {
+ validateAmbiguousColumnsOnRowPerMatch(
+ logicalMatch.getPartitionKeys(),
+ logicalMatch.getMeasures().keySet(),
+ logicalMatch.getInput().getRowType(),
+ logicalMatch.getRowType());
+ }
+ }
+
+ private void validateAmbiguousColumnsOnRowPerMatch(
+ ImmutableBitSet partitionKeys,
+ Set<String> measuresNames,
+ RelDataType inputSchema,
+ RelDataType expectedSchema) {
+ int actualSize = partitionKeys.toArray().length + measuresNames.size();
+ int expectedSize = expectedSchema.getFieldCount();
+ if (actualSize != expectedSize) {
+ // try to find ambiguous column
+
+ String ambiguousColumns =
+ Arrays.stream(partitionKeys.toArray())
+ .mapToObj(k -> inputSchema.getFieldList().get(k).getName())
+ .filter(measuresNames::contains)
+ .collect(Collectors.joining(", ", "{", "}"));
+
+ throw new ValidationException(
+ String.format("Columns ambiguously defined: %s", ambiguousColumns));
+ }
+ }
+
+ private static class AggregationsValidator extends RexDefaultVisitor<Object> {
+
+ @Override
+ public Object visitCall(RexCall call) {
+ SqlOperator operator = call.getOperator();
+ if (operator instanceof SqlAggFunction) {
+ call.accept(new AggregationPatternVariableFinder());
+ } else {
+ call.getOperands().forEach(o -> o.accept(this));
+ }
+ return null;
+ }
+
+ @Override
+ public Object visitNode(RexNode rexNode) {
+ return null;
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala
index b97d0c17b61..5a873fb1af6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala
@@ -17,14 +17,12 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
-import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.logical.MatchRecognize
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch
+import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalMatch
import org.apache.flink.table.planner.plan.utils.MatchUtil
-import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil._
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import _root_.java.util
@@ -38,59 +36,21 @@ class StreamPhysicalMatch(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputNode: RelNode,
- val logicalMatch: MatchRecognize,
+ logicalMatch: MatchRecognize,
outputRowType: RelDataType)
- extends SingleRel(cluster, traitSet, inputNode)
+ extends CommonPhysicalMatch(cluster, traitSet, inputNode, logicalMatch, outputRowType)
with StreamPhysicalRel {
- if (
- logicalMatch.measures.values().exists(containsPythonCall(_)) ||
- logicalMatch.patternDefinitions.values().exists(containsPythonCall(_))
- ) {
- throw new TableException("Python Function can not be used in MATCH_RECOGNIZE for now.")
- }
-
override def requireWatermark: Boolean = {
- val rowtimeFields = getInput.getRowType.getFieldList
+ val rowTimeFields = getInput.getRowType.getFieldList
.filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
- rowtimeFields.nonEmpty
+ rowTimeFields.nonEmpty
}
- override def deriveRowType(): RelDataType = outputRowType
-
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new StreamPhysicalMatch(cluster, traitSet, inputs.get(0), logicalMatch, outputRowType)
}
- override def explainTerms(pw: RelWriter): RelWriter = {
- val inputRowType = getInput.getRowType
- val fieldNames = inputRowType.getFieldNames.toList
- super
- .explainTerms(pw)
- .itemIf(
- "partitionBy",
- fieldToString(logicalMatch.partitionKeys.toArray, inputRowType),
- !logicalMatch.partitionKeys.isEmpty)
- .itemIf(
- "orderBy",
- collationToString(logicalMatch.orderKeys, inputRowType),
- !logicalMatch.orderKeys.getFieldCollations.isEmpty)
- .itemIf(
- "measures",
- measuresDefineToString(
- logicalMatch.measures,
- fieldNames,
- getExpressionString,
- convertToExpressionDetail(pw.getDetailLevel)),
- !logicalMatch.measures.isEmpty
- )
- .item("rowsPerMatch", rowsPerMatchToString(logicalMatch.allRows))
- .item("after", afterMatchToString(logicalMatch.after, fieldNames))
- .item("pattern", logicalMatch.pattern.toString)
- .itemIf("subset", subsetToString(logicalMatch.subsets), !logicalMatch.subsets.isEmpty)
- .item("define", logicalMatch.patternDefinitions)
- }
-
override def translateToExecNode(): ExecNode[_] = {
new StreamExecMatch(
unwrapTableConfig(this),
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 22b30f63e4c..30bc6696154 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -321,6 +321,7 @@ object FlinkBatchRuleSets {
FlinkLogicalRank.CONVERTER,
FlinkLogicalWindowAggregate.CONVERTER,
FlinkLogicalSnapshot.CONVERTER,
+ FlinkLogicalMatch.CONVERTER,
FlinkLogicalSink.CONVERTER,
FlinkLogicalLegacySink.CONVERTER,
FlinkLogicalDistribution.BATCH_CONVERTER
@@ -411,6 +412,8 @@ object FlinkBatchRuleSets {
BatchPhysicalSingleRowJoinRule.INSTANCE,
BatchPhysicalLookupJoinRule.SNAPSHOT_ON_TABLESCAN,
BatchPhysicalLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,
+ // CEP
+ BatchPhysicalMatchRule.INSTANCE,
// correlate
BatchPhysicalConstantTableFunctionScanRule.INSTANCE,
BatchPhysicalCorrelateRule.INSTANCE,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala
index 3142fb22bef..baecd116bd9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala
@@ -17,145 +17,35 @@
*/
package org.apache.flink.table.planner.plan.rules.physical.stream
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.logical.MatchRecognize
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMatch
-import org.apache.flink.table.planner.plan.utils.{MatchUtil, RexDefaultVisitor}
+import org.apache.flink.table.planner.plan.rules.physical.common.CommonPhysicalMatchRule
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.calcite.sql.SqlAggFunction
-import org.apache.calcite.util.ImmutableBitSet
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
class StreamPhysicalMatchRule
- extends ConverterRule(
+ extends CommonPhysicalMatchRule(
classOf[FlinkLogicalMatch],
FlinkConventions.LOGICAL,
FlinkConventions.STREAM_PHYSICAL,
"StreamPhysicalMatchRule") {
- override def matches(call: RelOptRuleCall): Boolean = {
- val logicalMatch: FlinkLogicalMatch = call.rel(0)
-
- validateAggregations(logicalMatch.getMeasures.values().asScala)
- validateAggregations(logicalMatch.getPatternDefinitions.values().asScala)
- // This check might be obsolete once CALCITE-2747 is resolved
- validateAmbiguousColumns(logicalMatch)
- true
- }
-
override def convert(rel: RelNode): RelNode = {
- val logicalMatch: FlinkLogicalMatch = rel.asInstanceOf[FlinkLogicalMatch]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
- val partitionKeys = logicalMatch.getPartitionKeys
-
- val requiredDistribution = if (!partitionKeys.isEmpty) {
- FlinkRelDistribution.hash(logicalMatch.getPartitionKeys.asList())
- } else {
- FlinkRelDistribution.SINGLETON
- }
- val requiredTraitSet = rel.getCluster.getPlanner
- .emptyTraitSet()
- .replace(requiredDistribution)
- .replace(FlinkConventions.STREAM_PHYSICAL)
-
- val convertInput: RelNode =
- RelOptRule.convert(logicalMatch.getInput, requiredTraitSet)
-
- try {
- Class
- .forName(
- "org.apache.flink.cep.pattern.Pattern",
- false,
- Thread.currentThread().getContextClassLoader)
- } catch {
- case ex: ClassNotFoundException =>
- throw new TableException(
- "MATCH RECOGNIZE clause requires flink-cep dependency to be present on the classpath.",
- ex)
- }
-
- new StreamPhysicalMatch(
- rel.getCluster,
- traitSet,
- convertInput,
- MatchRecognize(
- logicalMatch.getPattern,
- logicalMatch.getPatternDefinitions,
- logicalMatch.getMeasures,
- logicalMatch.getAfter,
- logicalMatch.getSubsets,
- logicalMatch.isAllRows,
- logicalMatch.getPartitionKeys,
- logicalMatch.getOrderKeys,
- logicalMatch.getInterval
- ),
- logicalMatch.getRowType)
- }
-
- private def validateAggregations(expr: Iterable[RexNode]): Unit = {
- val validator = new AggregationsValidator
- expr.foreach(_.accept(validator))
+ super.convert(rel, FlinkConventions.STREAM_PHYSICAL)
}
- private def validateAmbiguousColumns(logicalMatch: FlinkLogicalMatch): Unit = {
- if (logicalMatch.isAllRows) {
- throw new TableException("All rows per match mode is not supported yet.")
- } else {
- validateAmbiguousColumnsOnRowPerMatch(
- logicalMatch.getPartitionKeys,
- logicalMatch.getMeasures.keySet().asScala,
- logicalMatch.getInput.getRowType,
- logicalMatch.getRowType)
- }
+ override def convertToPhysicalMatch(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ convertInput: RelNode,
+ matchRecognize: MatchRecognize,
+ rowType: RelDataType): RelNode = {
+ new StreamPhysicalMatch(cluster, traitSet, convertInput, matchRecognize, rowType)
}
-
- private def validateAmbiguousColumnsOnRowPerMatch(
- partitionKeys: ImmutableBitSet,
- measuresNames: mutable.Set[String],
- inputSchema: RelDataType,
- expectedSchema: RelDataType): Unit = {
- val actualSize = partitionKeys.toArray.length + measuresNames.size
- val expectedSize = expectedSchema.getFieldCount
- if (actualSize != expectedSize) {
- // try to find ambiguous column
-
- val ambiguousColumns = partitionKeys.toArray
- .map(inputSchema.getFieldList.get(_).getName)
- .filter(measuresNames.contains)
- .mkString("{", ", ", "}")
-
- throw new ValidationException(s"Columns ambiguously defined: $ambiguousColumns")
- }
- }
-
- private class AggregationsValidator extends RexDefaultVisitor[Object] {
-
- override def visitCall(call: RexCall): AnyRef = {
- call.getOperator match {
- case _: SqlAggFunction =>
- call.accept(new MatchUtil.AggregationPatternVariableFinder)
- case _ =>
- call.getOperands.asScala.foreach(_.accept(this))
- }
-
- null
- }
-
- override def visitNode(rexNode: RexNode): AnyRef = {
- null
- }
- }
-
}
object StreamPhysicalMatchRule {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java
new file mode 100644
index 00000000000..cb40481db8d
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/** Validation test for {@link SqlMatchRecognize}. */
+@RunWith(Parameterized.class)
+public class MatchRecognizeValidationTest extends TableTestBase {
+
+ private static final String STREAM = "stream";
+ private static final String BATCH = "batch";
+
+ @Parameterized.Parameter public String mode;
+
+ @Parameterized.Parameters(name = "mode = {0}")
+ public static Collection<String> parameters() {
+ return Arrays.asList(STREAM, BATCH);
+ }
+
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ private TableTestUtil util;
+ private TableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ util =
+ STREAM.equals(mode)
+ ? streamTestUtil(TableConfig.getDefault())
+ : batchTestUtil(TableConfig.getDefault());
+ tEnv = util.getTableEnv();
+ tEnv.executeSql(
+ "CREATE TABLE Ticker (\n"
+ + " `symbol` VARCHAR,\n"
+ + " `price` INT,\n"
+ + " `tax` INT,\n"
+ + " `proctime` as PROCTIME()\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+ tEnv.executeSql(
+ "CREATE TABLE MyTable (\n"
+ + " a BIGINT,\n"
+ + " b INT,\n"
+ + " proctime as PROCTIME()\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+ }
+
+ @After
+ public void after() {
+ util.getTableEnv().executeSql("DROP TABLE Ticker");
+ util.getTableEnv().executeSql("DROP TABLE MyTable");
+ }
+
+ /** Function 'MATCH_ROWTIME()' can only be used in MATCH_RECOGNIZE. */
+ @Test(expected = ValidationException.class)
+ public void testMatchRowTimeInSelect() {
+ String sql = "SELECT MATCH_ROWTIME() FROM MyTable";
+ util.verifyExplain(sql);
+ }
+
+ /** Function 'MATCH_PROCTIME()' can only be used in MATCH_RECOGNIZE. */
+ @Test(expected = ValidationException.class)
+ public void testMatchProcTimeInSelect() {
+ String sql = "SELECT MATCH_PROCTIME() FROM MyTable";
+ util.verifyExplain(sql);
+ }
+
+ @Test
+ public void testSortProcessingTimeDesc() {
+ if (STREAM.equals(mode)) {
+ expectedException.expect(TableException.class);
+ expectedException.expectMessage(
+ "Primary sort order of a streaming table must be ascending on time.");
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime DESC\n"
+ + " MEASURES\n"
+ + " A.symbol AS aSymbol\n"
+ + " PATTERN (A B)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ") AS T";
+ tEnv.executeSql(sqlQuery);
+ }
+ }
+
+ @Test
+ public void testSortProcessingTimeSecondaryField() {
+ if (STREAM.equals(mode)) {
+ expectedException.expect(TableException.class);
+ expectedException.expectMessage(
+ "You must specify either rowtime or proctime for order by as the first one.");
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY price, proctime\n"
+ + " MEASURES\n"
+ + " A.symbol AS aSymbol\n"
+ + " PATTERN (A B)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ") AS T";
+ tEnv.executeSql(sqlQuery);
+ }
+ }
+
+ @Test
+ public void testSortNoOrder() {
+ if (STREAM.equals(mode)) {
+ expectedException.expect(TableException.class);
+ expectedException.expectMessage(
+ "You must specify either rowtime or proctime for order by.");
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " MEASURES\n"
+ + " A.symbol AS aSymbol\n"
+ + " PATTERN (A B)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ") AS T";
+ tEnv.executeSql(sqlQuery);
+ }
+ }
+
+ @Test
+ public void testUpdatesInUpstreamOperatorNotSupported() {
+ if (STREAM.equals(mode)) {
+ expectedException.expect(TableException.class);
+ expectedException.expectMessage(
+ "Match Recognize doesn't support consuming update changes which is produced by node GroupAggregate(");
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM (SELECT DISTINCT * FROM Ticker)\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " A.symbol AS aSymbol\n"
+ + " ONE ROW PER MATCH"
+ + " PATTERN (A B)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ") AS T";
+ tEnv.executeSql(sqlQuery);
+ }
+ }
+
+ @Test
+ public void testAggregatesOnMultiplePatternVariablesNotSupported() {
+ expectedException.expect(ValidationException.class);
+ expectedException.expectMessage("SQL validation failed.");
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " SUM(A.price + B.tax) AS taxedPrice\n"
+ + " PATTERN (A B)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ") AS T";
+ tEnv.executeSql(sqlQuery);
+ }
+
+ @Test
+ public void testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs() {
+ expectedException.expect(ValidationException.class);
+ expectedException.expectMessage("Aggregation must be applied to a single pattern variable");
+ util.addTemporarySystemFunction("weightedAvg", new WeightedAvg());
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " weightedAvg(A.price, B.tax) AS weightedAvg\n"
+ + " PATTERN (A B)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ") AS T";
+ tEnv.executeSql(sqlQuery);
+ }
+
+ @Test
+ public void testValidatingAmbiguousColumns() {
+ expectedException.expect(ValidationException.class);
+ expectedException.expectMessage("Columns ambiguously defined: {symbol, price}");
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " PARTITION BY symbol, price\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " A.symbol AS symbol,\n"
+ + " A.price AS price\n"
+ + " PATTERN (A)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ") AS T";
+ tEnv.executeSql(sqlQuery);
+ }
+
+ // ***************************************************************************************
+ // * Those validations are temporary. We should remove those tests once we support those *
+ // * features. *
+ // ***************************************************************************************
+
+ /** Python Function can not be used in MATCH_RECOGNIZE for now. */
+ @Test
+ public void testMatchPythonFunction() {
+ expectedException.expect(TableException.class);
+ expectedException.expectMessage(
+ "Python Function can not be used in MATCH_RECOGNIZE for now.");
+ util.addTemporarySystemFunction("pyFunc", new PythonScalarFunction("pyFunc"));
+ String sql =
+ "SELECT T.aa as ta\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " A.a as aa,\n"
+ + " pyFunc(1,2) as bb\n"
+ + " PATTERN (A B)\n"
+ + " DEFINE\n"
+ + " A AS a = 1,\n"
+ + " B AS b = 'b'\n"
+ + ") AS T";
+ util.verifyExplain(sql);
+ }
+
+ @Test
+ public void testAllRowsPerMatch() {
+ expectedException.expect(TableException.class);
+ expectedException.expectMessage("All rows per match mode is not supported yet.");
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " A.symbol AS aSymbol\n"
+ + " ALL ROWS PER MATCH\n"
+ + " PATTERN (A B)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ") AS T";
+ tEnv.executeSql(sqlQuery);
+ }
+
+ @Test
+ public void testGreedyQuantifierAtTheEndIsNotSupported() {
+ expectedException.expect(TableException.class);
+ expectedException.expectMessage(
+ "Greedy quantifiers are not allowed as the last element of a "
+ + "Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier.");
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " A.symbol AS aSymbol\n"
+ + " PATTERN (A B+)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ") AS T";
+ tEnv.executeSql(sqlQuery);
+ }
+
+ @Test
+ public void testPatternsProducingEmptyMatchesAreNotSupported() {
+ expectedException.expect(TableException.class);
+ expectedException.expectMessage(
+ "Patterns that can produce empty matches are not supported. "
+ + "There must be at least one non-optional state.");
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " A.symbol AS aSymbol\n"
+ + " PATTERN (A*)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ") AS T";
+ tEnv.executeSql(sqlQuery);
+ }
+
+ @Test
+ public void testDistinctAggregationsNotSupported() {
+ expectedException.expect(ValidationException.class);
+ expectedException.expectMessage("SQL validation failed.");
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " COUNT(DISTINCT A.price) AS price\n"
+ + " PATTERN (A B)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ") AS T";
+ tEnv.executeSql(sqlQuery);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.java
new file mode 100644
index 00000000000..eca0bb0cfcf
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * imitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/** Tests for {@link SqlMatchRecognize}. */
+public class MatchRecognizeTest extends TableTestBase {
+
+ private BatchTableTestUtil util;
+
+ @Before
+ public void before() {
+ util = batchTestUtil(TableConfig.getDefault());
+ util.getTableEnv()
+ .executeSql(
+ "CREATE TABLE Ticker (\n"
+ + " `symbol` VARCHAR,\n"
+ + " `price` INT,\n"
+ + " `tax` INT,\n"
+ + " `ts_ltz` as PROCTIME()\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true'\n"
+ + ")");
+ }
+
+ @After
+ public void after() {
+ util.getTableEnv().executeSql("DROP TABLE Ticker");
+ }
+
+ @Test
+ public void testCascadeMatch() {
+ String sqlQuery =
+ "SELECT *\n"
+ + "FROM (\n"
+ + " SELECT\n"
+ + " symbol,\n"
+ + " price\n"
+ + " FROM Ticker\n"
+ + " MATCH_RECOGNIZE (\n"
+ + " PARTITION BY symbol\n"
+ + " ORDER BY ts_ltz"
+ + " MEASURES\n"
+ + " A.price as price,\n"
+ + " A.tax as tax\n"
+ + " ONE ROW PER MATCH\n"
+ + " PATTERN (A)\n"
+ + " DEFINE\n"
+ + " A AS A.price > 0\n"
+ + " ) AS T\n"
+ + " GROUP BY symbol, price\n"
+ + ")\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " PARTITION BY symbol\n"
+ + " MEASURES\n"
+ + " A.price as dPrice\n"
+ + " PATTERN (A)\n"
+ + " DEFINE\n"
+ + " A AS A.symbol = 'a'\n"
+ + ")";
+ util.verifyExecPlan(sqlQuery);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
index a336a42a2f6..80c8baa66e0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
@@ -152,4 +152,24 @@ public class BatchOperatorNameTest extends OperatorNameTestBase {
util.testingTableEnv().registerTableSinkInternal("MySink", sink);
verifyInsert("insert into MySink select * from MySource");
}
+
+ @Test
+ public void testMatch() {
+ createSourceWithTimeAttribute();
+ String sql =
+ "SELECT T.aid, T.bid, T.cid\n"
+ + " FROM MyTable MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " `A\"`.a AS aid,\n"
+ + " \u006C.a AS bid,\n"
+ + " C.a AS cid\n"
+ + " PATTERN (`A\"` \u006C C)\n"
+ + " DEFINE\n"
+ + " `A\"` AS a = 1,\n"
+ + " \u006C AS b = 2,\n"
+ + " C AS c = 'c'\n"
+ + " ) AS T";
+ verifyQuery(sql);
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
new file mode 100644
index 00000000000..31860779478
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * imitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.apache.flink.api.common.typeinfo.Types.DOUBLE;
+import static org.apache.flink.api.common.typeinfo.Types.INT;
+import static org.apache.flink.api.common.typeinfo.Types.LONG;
+import static org.apache.flink.api.common.typeinfo.Types.ROW_NAMED;
+import static org.apache.flink.api.common.typeinfo.Types.STRING;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** IT Case for testing {@link SqlMatchRecognize}. */
+public class MatchRecognizeITCase {
+
+ private StreamExecutionEnvironment env;
+ private StreamTableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode());
+ }
+
+ @Test
+ public void testSimplePattern() {
+ tEnv.createTemporaryView(
+ "MyTable",
+ tEnv.fromDataStream(
+ env.fromElements(
+ Row.of(1, "a"),
+ Row.of(2, "z"),
+ Row.of(3, "b"),
+ Row.of(4, "c"),
+ Row.of(5, "d"),
+ Row.of(6, "a"),
+ Row.of(7, "b"),
+ Row.of(8, "c"),
+ Row.of(9, "h"))
+ .returns(ROW_NAMED(new String[] {"id", "name"}, INT, STRING)),
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build()));
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT T.aid, T.bid, T.cid\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " `A\"`.id AS aid,\n"
+ + " \u006C.id AS bid,\n"
+ + " C.id AS cid\n"
+ + " PATTERN (`A\"` \u006C C)\n"
+ + " DEFINE\n"
+ + " `A\"` AS name = 'a',\n"
+ + " \u006C AS name = 'b',\n"
+ + " C AS name = 'c'\n"
+ + ") AS T");
+ assertEquals(
+ Collections.singletonList(Row.of(6, 7, 8)),
+ CollectionUtil.iteratorToList(tableResult.collect()));
+ }
+
+ @Test
+ public void testSimplePatternWithNulls() {
+ tEnv.createTemporaryView(
+ "MyTable",
+ tEnv.fromDataStream(
+ env.fromElements(
+ Row.of(1, "a", null),
+ Row.of(2, "b", null),
+ Row.of(3, "c", null),
+ Row.of(4, "d", null),
+ Row.of(5, null, null),
+ Row.of(6, "a", null),
+ Row.of(7, "b", null),
+ Row.of(8, "c", null),
+ Row.of(9, null, null))
+ .returns(
+ ROW_NAMED(
+ new String[] {"id", "name", "nullField"},
+ INT,
+ STRING,
+ STRING)),
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("nullField", DataTypes.STRING())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build()));
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT T.aid, T.bNull, T.cid, T.aNull\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " A.id AS aid,\n"
+ + " A.nullField AS aNull,\n"
+ + " LAST(B.nullField) AS bNull,\n"
+ + " C.id AS cid\n"
+ + " PATTERN (A B C)\n"
+ + " DEFINE\n"
+ + " A AS name = 'a' AND nullField IS NULL,\n"
+ + " B AS name = 'b' AND LAST(A.nullField) IS NULL,\n"
+ + " C AS name = 'c'\n"
+ + ") AS T");
+ assertEquals(
+ Arrays.asList(Row.of(1, null, 3, null), Row.of(6, null, 8, null)),
+ CollectionUtil.iteratorToList(tableResult.collect()));
+ }
+
+ @Test
+ public void testCodeSplitsAreProperlyGenerated() {
+ tEnv.getConfig().setMaxGeneratedCodeLength(1);
+ tEnv.createTemporaryView(
+ "MyTable",
+ tEnv.fromDataStream(
+ env.fromElements(
+ Row.of(1, "a", "key1", "second_key3"),
+ Row.of(2, "b", "key1", "second_key3"),
+ Row.of(3, "c", "key1", "second_key3"),
+ Row.of(4, "d", "key", "second_key"),
+ Row.of(5, "e", "key", "second_key"),
+ Row.of(6, "a", "key2", "second_key4"),
+ Row.of(7, "b", "key2", "second_key4"),
+ Row.of(8, "c", "key2", "second_key4"),
+ Row.of(9, "f", "key", "second_key"))
+ .returns(
+ ROW_NAMED(
+ new String[] {"id", "name", "key1", "key2"},
+ INT,
+ STRING,
+ STRING,
+ STRING)),
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("key1", DataTypes.STRING())
+ .column("key2", DataTypes.STRING())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build()));
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT *\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " PARTITION BY key1, key2\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " A.id AS aid,\n"
+ + " A.key1 AS akey1,\n"
+ + " LAST(B.id) AS bid,\n"
+ + " C.id AS cid,\n"
+ + " C.key2 AS ckey2\n"
+ + " PATTERN (A B C)\n"
+ + " DEFINE\n"
+ + " A AS name = 'a' AND key1 LIKE '%key%' AND id > 0,\n"
+ + " B AS name = 'b' AND LAST(A.name, 2) IS NULL,\n"
+ + " C AS name = 'c' AND LAST(A.name) = 'a'\n"
+ + ") AS T");
+ List<Row> actual = CollectionUtil.iteratorToList(tableResult.collect());
+ actual.sort(Comparator.comparing(o -> String.valueOf(o.getField(0))));
+ assertEquals(
+ Arrays.asList(
+ Row.of("key1", "second_key3", 1, "key1", 2, 3, "second_key3"),
+ Row.of("key2", "second_key4", 6, "key2", 7, 8, "second_key4")),
+ actual);
+ }
+
+ @Test
+ public void testLogicalOffsets() {
+ tEnv.createTemporaryView(
+ "Ticker",
+ tEnv.fromDataStream(
+ env.fromElements(
+ Row.of("ACME", 1L, 19, 1),
+ Row.of("ACME", 2L, 17, 2),
+ Row.of("ACME", 3L, 13, 3),
+ Row.of("ACME", 4L, 20, 4),
+ Row.of("ACME", 5L, 20, 5),
+ Row.of("ACME", 6L, 26, 6),
+ Row.of("ACME", 7L, 20, 7),
+ Row.of("ACME", 8L, 25, 8))
+ .returns(
+ ROW_NAMED(
+ new String[] {"symbol", "tstamp", "price", "tax"},
+ STRING,
+ LONG,
+ INT,
+ INT)),
+ Schema.newBuilder()
+ .column("symbol", DataTypes.STRING())
+ .column("tstamp", DataTypes.BIGINT())
+ .column("price", DataTypes.INT())
+ .column("tax", DataTypes.INT())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build()));
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " FIRST(DOWN.tstamp) AS start_tstamp,\n"
+ + " LAST(DOWN.tstamp) AS bottom_tstamp,\n"
+ + " UP.tstamp AS end_tstamp,\n"
+ + " FIRST(DOWN.price + DOWN.tax + 1) AS bottom_total,\n"
+ + " UP.price + UP.tax AS end_total\n"
+ + " ONE ROW PER MATCH\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (DOWN{2,} UP)\n"
+ + " DEFINE\n"
+ + " DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n"
+ + " UP AS price < FIRST(DOWN.price)\n"
+ + ") AS T");
+ assertEquals(
+ Collections.singletonList(Row.of(6L, 7L, 8L, 33, 33)),
+ CollectionUtil.iteratorToList(tableResult.collect()));
+ }
+
+ @Test
+ public void testLogicalOffsetsWithStarVariable() {
+ tEnv.createTemporaryView(
+ "Ticker",
+ tEnv.fromDataStream(
+ env.fromElements(
+ Row.of(1, "ACME", 1L, 20),
+ Row.of(2, "ACME", 2L, 19),
+ Row.of(3, "ACME", 3L, 18),
+ Row.of(4, "ACME", 4L, 17),
+ Row.of(5, "ACME", 5L, 16),
+ Row.of(6, "ACME", 6L, 15),
+ Row.of(7, "ACME", 7L, 14),
+ Row.of(8, "ACME", 8L, 20))
+ .returns(
+ ROW_NAMED(
+ new String[] {"id", "symbol", "tstamp", "price"},
+ INT,
+ STRING,
+ LONG,
+ INT)),
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("symbol", DataTypes.STRING())
+ .column("tstamp", DataTypes.BIGINT())
+ .column("price", DataTypes.INT())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build()));
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " FIRST(id, 0) as id0,\n"
+ + " FIRST(id, 1) as id1,\n"
+ + " FIRST(id, 2) as id2,\n"
+ + " FIRST(id, 3) as id3,\n"
+ + " FIRST(id, 4) as id4,\n"
+ + " FIRST(id, 5) as id5,\n"
+ + " FIRST(id, 6) as id6,\n"
+ + " FIRST(id, 7) as id7,\n"
+ + " LAST(id, 0) as id8,\n"
+ + " LAST(id, 1) as id9,\n"
+ + " LAST(id, 2) as id10,\n"
+ + " LAST(id, 3) as id11,\n"
+ + " LAST(id, 4) as id12,\n"
+ + " LAST(id, 5) as id13,\n"
+ + " LAST(id, 6) as id14,\n"
+ + " LAST(id, 7) as id15\n"
+ + " ONE ROW PER MATCH\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (`DOWN\"`{2,} UP)\n"
+ + " DEFINE\n"
+ + " `DOWN\"` AS price < LAST(price, 1) OR LAST(price, 1) IS NULL,\n"
+ + " UP AS price = FIRST(price) AND price > FIRST(price, 3) AND price = LAST(price, 7)\n"
+ + ") AS T");
+ assertEquals(
+ Collections.singletonList(Row.of(1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 3, 2, 1)),
+ CollectionUtil.iteratorToList(tableResult.collect()));
+ }
+
+ @Test
+ public void testLogicalOffsetOutsideOfRangeInMeasures() {
+ tEnv.createTemporaryView(
+ "Ticker",
+ tEnv.fromDataStream(
+ env.fromElements(
+ Row.of("ACME", 1L, 19, 1),
+ Row.of("ACME", 2L, 17, 2),
+ Row.of("ACME", 3L, 13, 3),
+ Row.of("ACME", 4L, 20, 4))
+ .returns(
+ ROW_NAMED(
+ new String[] {"symbol", "tstamp", "price", "tax"},
+ STRING,
+ LONG,
+ INT,
+ INT)),
+ Schema.newBuilder()
+ .column("symbol", DataTypes.STRING())
+ .column("tstamp", DataTypes.BIGINT())
+ .column("price", DataTypes.INT())
+ .column("tax", DataTypes.INT())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build()));
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " FIRST(DOWN.price) as first,\n"
+ + " LAST(DOWN.price) as last,\n"
+ + " FIRST(DOWN.price, 5) as nullPrice\n"
+ + " ONE ROW PER MATCH\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (DOWN{2,} UP)\n"
+ + " DEFINE\n"
+ + " DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n"
+ + " UP AS price > LAST(DOWN.price)\n"
+ + ") AS T");
+ assertEquals(
+ Collections.singletonList(Row.of(19, 13, null)),
+ CollectionUtil.iteratorToList(tableResult.collect()));
+ }
+
+ /**
+ * This query checks:
+ *
+ * <p>1. count(D.price) produces 0, because no rows matched to D 2. sum(D.price) produces null,
+ * because no rows matched to D 3. aggregates that take multiple parameters work 4. aggregates
+ * with expressions work
+ */
+ @Test
+ public void testAggregates() {
+ tEnv.getConfig().setMaxGeneratedCodeLength(1);
+ tEnv.createTemporaryView(
+ "MyTable",
+ tEnv.fromDataStream(
+ env.fromElements(
+ Row.of(1, "a", 1, 0.8, 1),
+ Row.of(2, "z", 2, 0.8, 3),
+ Row.of(3, "b", 1, 0.8, 2),
+ Row.of(4, "c", 1, 0.8, 5),
+ Row.of(5, "d", 4, 0.1, 5),
+ Row.of(6, "a", 2, 1.5, 2),
+ Row.of(7, "b", 2, 0.8, 3),
+ Row.of(8, "c", 1, 0.8, 2),
+ Row.of(9, "h", 4, 0.8, 3),
+ Row.of(10, "h", 4, 0.8, 3),
+ Row.of(11, "h", 2, 0.8, 3),
+ Row.of(12, "h", 2, 0.8, 3))
+ .returns(
+ ROW_NAMED(
+ new String[] {
+ "id", "name", "price", "rate", "weight"
+ },
+ INT,
+ STRING,
+ INT,
+ DOUBLE,
+ INT)),
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("price", DataTypes.INT())
+ .column("rate", DataTypes.DOUBLE())
+ .column("weight", DataTypes.INT())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build()));
+ tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT *\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " FIRST(id) as startId,\n"
+ + " SUM(A.price) AS sumA,\n"
+ + " COUNT(D.price) AS countD,\n"
+ + " SUM(D.price) as sumD,\n"
+ + " weightedAvg(price, weight) as wAvg,\n"
+ + " AVG(B.price) AS avgB,\n"
+ + " SUM(B.price * B.rate) as sumExprB,\n"
+ + " LAST(id) as endId\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (A+ B+ C D? E)\n"
+ + " DEFINE\n"
+ + " A AS SUM(A.price) < 6,\n"
+ + " B AS SUM(B.price * B.rate) < SUM(A.price) AND\n"
+ + " SUM(B.price * B.rate) > 0.2 AND\n"
+ + " SUM(B.price) >= 1 AND\n"
+ + " AVG(B.price) >= 1 AND\n"
+ + " weightedAvg(price, weight) > 1\n"
+ + ") AS T");
+ assertEquals(
+ Arrays.asList(
+ Row.of(1, 5, 0L, null, 2L, 3, 3.4D, 8),
+ Row.of(9, 4, 0L, null, 3L, 4, 3.2D, 12)),
+ CollectionUtil.iteratorToList(tableResult.collect()));
+ }
+
+ @Test
+ public void testAggregatesWithNullInputs() {
+ tEnv.getConfig().setMaxGeneratedCodeLength(1);
+ tEnv.createTemporaryView(
+ "MyTable",
+ tEnv.fromDataStream(
+ env.fromElements(
+ Row.of(1, "a", 10),
+ Row.of(2, "z", 10),
+ Row.of(3, "b", null),
+ Row.of(4, "c", null),
+ Row.of(5, "d", 3),
+ Row.of(6, "c", 3),
+ Row.of(7, "c", 3),
+ Row.of(8, "c", 3),
+ Row.of(9, "c", 2))
+ .returns(
+ ROW_NAMED(
+ new String[] {"id", "name", "price"},
+ INT,
+ STRING,
+ INT)),
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("price", DataTypes.INT())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build()));
+ tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT *\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " SUM(A.price) as sumA,\n"
+ + " COUNT(A.id) as countAId,\n"
+ + " COUNT(A.price) as countAPrice,\n"
+ + " COUNT(*) as countAll,\n"
+ + " COUNT(price) as countAllPrice,\n"
+ + " LAST(id) as endId\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (A+ C)\n"
+ + " DEFINE\n"
+ + " A AS SUM(A.price) < 30,\n"
+ + " C AS C.name = 'c'\n"
+ + ") AS T");
+ assertEquals(
+ Collections.singletonList(Row.of(29, 7L, 5L, 8L, 6L, 8)),
+ CollectionUtil.iteratorToList(tableResult.collect()));
+ }
+
+ @Test
+ public void testAccessingCurrentTime() {
+ tEnv.createTemporaryView(
+ "MyTable",
+ tEnv.fromDataStream(
+ env.fromElements(Row.of(1, "a"))
+ .returns(ROW_NAMED(new String[] {"id", "name"}, INT, STRING)),
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build()));
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT T.aid\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " A.id AS aid,\n"
+ + " A.proctime AS aProctime,\n"
+ + " LAST(A.proctime + INTERVAL '1' second) as calculatedField\n"
+ + " PATTERN (A)\n"
+ + " DEFINE\n"
+ + " A AS proctime >= (CURRENT_TIMESTAMP - INTERVAL '1' day)\n"
+ + ") AS T");
+ assertEquals(
+ Collections.singletonList(Row.of(1)),
+ CollectionUtil.iteratorToList(tableResult.collect()));
+ }
+
+ @Test
+ public void testUserDefinedFunctions() {
+ tEnv.getConfig().setMaxGeneratedCodeLength(1);
+ tEnv.createTemporaryView(
+ "MyTable",
+ tEnv.fromDataStream(
+ env.fromElements(
+ Row.of(1, "a", 1),
+ Row.of(2, "a", 1),
+ Row.of(3, "a", 1),
+ Row.of(4, "a", 1),
+ Row.of(5, "a", 1),
+ Row.of(6, "b", 1),
+ Row.of(7, "a", 1),
+ Row.of(8, "a", 1),
+ Row.of(9, "f", 1))
+ .returns(
+ ROW_NAMED(
+ new String[] {"id", "name", "price"},
+ INT,
+ STRING,
+ INT)),
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("price", DataTypes.INT())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build()));
+ tEnv.createTemporarySystemFunction("prefix", new PrefixingScalarFunc());
+ tEnv.createTemporarySystemFunction("countFrom", new RichAggFunc());
+ String prefix = "PREF";
+ int startFrom = 4;
+ Configuration jobParameters = new Configuration();
+ jobParameters.setString("prefix", prefix);
+ jobParameters.setString("start", Integer.toString(startFrom));
+ env.getConfig().setGlobalJobParameters(jobParameters);
+ TableResult tableResult =
+ tEnv.executeSql(
+ String.format(
+ "SELECT *\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " FIRST(id) as firstId,\n"
+ + " prefix(A.name) as prefixedNameA,\n"
+ + " countFrom(A.price) as countFromA,\n"
+ + " LAST(id) as lastId\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (A+ C)\n"
+ + " DEFINE\n"
+ + " A AS prefix(A.name) = '%s:a' AND countFrom(A.price) <= %d\n"
+ + ") AS T",
+ prefix, 4 + 4));
+ assertEquals(
+ Arrays.asList(Row.of(1, "PREF:a", 8, 5), Row.of(7, "PREF:a", 6, 9)),
+ CollectionUtil.iteratorToList(tableResult.collect()));
+ }
+
+ /** Test prefixing function.. */
+ public static class PrefixingScalarFunc extends ScalarFunction {
+
+ private String prefix = "ERROR_VALUE";
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ prefix = context.getJobParameter("prefix", "");
+ }
+
+ public String eval(String value) {
+ return String.format("%s:%s", prefix, value);
+ }
+ }
+
+ /** Test count accumulator. */
+ public static class CountAcc {
+ public Integer count;
+
+ public CountAcc(Integer count) {
+ this.count = count;
+ }
+ }
+
+ /** Test rich aggregate function. */
+ public static class RichAggFunc extends AggregateFunction<Integer, CountAcc> {
+
+ private Integer start = 0;
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ start = Integer.valueOf(context.getJobParameter("start", "0"));
+ }
+
+ @Override
+ public void close() throws Exception {
+ start = 0;
+ }
+
+ @Override
+ public CountAcc createAccumulator() {
+ return new CountAcc(start);
+ }
+
+ @Override
+ public Integer getValue(CountAcc accumulator) {
+ return accumulator.count;
+ }
+
+ public void accumulate(CountAcc countAcc, Integer value) {
+ countAcc.count += value;
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.xml
new file mode 100644
index 00000000000..ba1da9ec987
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testCascadeMatch">
+ <Resource name="sql">
+ <![CDATA[SELECT *
+FROM (
+ SELECT
+ symbol,
+ price
+ FROM Ticker
+ MATCH_RECOGNIZE (
+ PARTITION BY symbol
+ ORDER BY ts_ltz MEASURES
+ A.price as price,
+ A.tax as tax
+ ONE ROW PER MATCH
+ PATTERN (A)
+ DEFINE
+ A AS A.price > 0
+ ) AS T
+ GROUP BY symbol, price
+)
+MATCH_RECOGNIZE (
+ PARTITION BY symbol
+ MEASURES
+ A.price as dPrice
+ PATTERN (A)
+ DEFINE
+ A AS A.symbol = 'a'
+)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(symbol=[$0], dPrice=[$1])
++- LogicalMatch(partition=[[0]], order=[[]], outputFields=[[symbol, dPrice]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[_UTF-16LE'A'], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(PREV(A.$0, 0), _UTF-16LE'a')]], inputFields=[[symbol, price]])
+ +- LogicalAggregate(group=[{0, 1}])
+ +- LogicalProject(symbol=[$0], price=[$1])
+ +- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, price, tax]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[_UTF-16LE'A'], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 0)]], inputFields=[[symbol, price, tax, ts_ltz]])
+ +- LogicalProject(symbol=[$0], price=[$1], tax=[$2], ts_ltz=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, Ticker]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Match(partitionBy=[symbol], measures=[FINAL(A.price) AS dPrice], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[_UTF-16LE'A'], define=[{A==(PREV(A.$0, 0), _UTF-16LE'a')}])
++- Exchange(distribution=[hash[symbol]])
+ +- HashAggregate(isMerge=[true], groupBy=[symbol, price], select=[symbol, price])
+ +- Exchange(distribution=[hash[symbol, price]])
+ +- LocalHashAggregate(groupBy=[symbol, price], select=[symbol, price])
+ +- Calc(select=[symbol, price])
+ +- Match(partitionBy=[symbol], orderBy=[ts_ltz ASC], measures=[FINAL(A.price) AS price, FINAL(A.tax) AS tax], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[_UTF-16LE'A'], define=[{A=>(PREV(A.$1, 0), 0)}])
+ +- Calc(select=[symbol, price, tax, PROCTIME() AS ts_ltz])
+ +- Exchange(distribution=[hash[symbol]])
+ +- TableSourceScan(table=[[default_catalog, default_database, Ticker]], fields=[symbol, price, tax])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
index 9e8ac59965d..5552fb5ba70 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
@@ -2093,6 +2093,114 @@ Calc(select=[b, w$end AS window_end, EXPR$2])
"side" : "second"
} ]
} ]
+}]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMatch[isNameSimplifyEnabled=false]">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(aid=[$0], bid=[$1], cid=[$2])
++- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Optimized Execution Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+ "pact" : "Data Source",
+ "contents" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+ "pact" : "Operator",
+ "contents" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "GLOBAL",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+ "pact" : "Operator",
+ "contents" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMatch[isNameSimplifyEnabled=true]">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(aid=[$0], bid=[$1], cid=[$2])
++- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Optimized Execution Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+ +- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: MyTable[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "GLOBAL",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Match[]",
+ "pact" : "Operator",
+ "contents" : "[]:Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
}]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala
deleted file mode 100644
index 6897fb8735c..00000000000
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.stream.sql.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
-import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg
-import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction
-import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.ToMillis
-import org.apache.flink.table.planner.utils.TableTestBase
-import org.apache.flink.types.Row
-
-import org.junit.Test
-
-import java.sql.Timestamp
-
-class MatchRecognizeValidationTest extends TableTestBase {
-
- private val streamUtil = scalaStreamTestUtil()
- streamUtil.addDataStream[(Int, String, Timestamp)](
- "MyTable",
- 'a,
- 'b,
- 'rowtime.rowtime,
- 'proctime.proctime)
- streamUtil.addDataStream[(String, Long, Int, Int)](
- "Ticker",
- 'symbol,
- 'tstamp,
- 'price,
- 'tax,
- 'proctime.proctime)
- streamUtil.addFunction("ToMillis", new ToMillis)
-
- /** Function 'MATCH_ROWTIME()' can only be used in MATCH_RECOGNIZE * */
- @Test(expected = classOf[ValidationException])
- def testMatchRowtimeInSelect(): Unit = {
- val sql = "SELECT MATCH_ROWTIME() FROM MyTable"
- streamUtil.verifyExplain(sql)
- }
-
- /** Function 'MATCH_PROCTIME()' can only be used in MATCH_RECOGNIZE * */
- @Test(expected = classOf[ValidationException])
- def testMatchProctimeInSelect(): Unit = {
- val sql = "SELECT MATCH_PROCTIME() FROM MyTable"
- streamUtil.verifyExplain(sql)
- }
-
- @Test
- def testSortProcessingTimeDesc(): Unit = {
- thrown.expectMessage("Primary sort order of a streaming table must be ascending on time.")
- thrown.expect(classOf[TableException])
-
- val sqlQuery =
- s"""
- |SELECT *
- |FROM Ticker
- |MATCH_RECOGNIZE (
- | ORDER BY proctime DESC
- | MEASURES
- | A.symbol AS aSymbol
- | PATTERN (A B)
- | DEFINE
- | A AS symbol = 'a'
- |) AS T
- |""".stripMargin
-
- streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
- }
-
- @Test
- def testSortProcessingTimeSecondaryField(): Unit = {
- thrown.expectMessage(
- "You must specify either rowtime or proctime for order by as " +
- "the first one.")
- thrown.expect(classOf[TableException])
-
- val sqlQuery =
- s"""
- |SELECT *
- |FROM Ticker
- |MATCH_RECOGNIZE (
- | ORDER BY price, proctime
- | MEASURES
- | A.symbol AS aSymbol
- | PATTERN (A B)
- | DEFINE
- | A AS symbol = 'a'
- |) AS T
- |""".stripMargin
-
- streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
- }
-
- @Test
- def testSortNoOrder(): Unit = {
- thrown.expectMessage("You must specify either rowtime or proctime for order by.")
- thrown.expect(classOf[TableException])
-
- val sqlQuery =
- s"""
- |SELECT *
- |FROM Ticker
- |MATCH_RECOGNIZE (
- | MEASURES
- | A.symbol AS aSymbol
- | PATTERN (A B)
- | DEFINE
- | A AS symbol = 'a'
- |) AS T
- |""".stripMargin
-
- streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
- }
-
- @Test
- def testUpdatesInUpstreamOperatorNotSupported(): Unit = {
- thrown.expectMessage(
- "Match Recognize doesn't support consuming update changes " +
- "which is produced by node GroupAggregate(")
- thrown.expect(classOf[TableException])
-
- val sqlQuery =
- s"""
- |SELECT *
- |FROM (SELECT DISTINCT * FROM Ticker)
- |MATCH_RECOGNIZE (
- | ORDER BY proctime
- | MEASURES
- | A.symbol AS aSymbol
- | ONE ROW PER MATCH
- | PATTERN (A B)
- | DEFINE
- | A AS symbol = 'a'
- |) AS T
- |""".stripMargin
-
- streamUtil.tableEnv.sqlQuery(sqlQuery).toRetractStream[Row]
- }
-
- @Test
- def testAggregatesOnMultiplePatternVariablesNotSupported(): Unit = {
- thrown.expect(classOf[ValidationException])
- thrown.expectMessage("SQL validation failed.")
-
- val sqlQuery =
- s"""
- |SELECT *
- |FROM Ticker
- |MATCH_RECOGNIZE (
- | ORDER BY proctime
- | MEASURES
- | SUM(A.price + B.tax) AS taxedPrice
- | PATTERN (A B)
- | DEFINE
- | A AS A.symbol = 'a'
- |) AS T
- |""".stripMargin
-
- streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
- }
-
- @Test
- def testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs(): Unit = {
- thrown.expect(classOf[ValidationException])
- thrown.expectMessage("Aggregation must be applied to a single pattern variable")
-
- streamUtil.addFunction("weightedAvg", new WeightedAvg)
-
- val sqlQuery =
- s"""
- |SELECT *
- |FROM Ticker
- |MATCH_RECOGNIZE (
- | ORDER BY proctime
- | MEASURES
- | weightedAvg(A.price, B.tax) AS weightedAvg
- | PATTERN (A B)
- | DEFINE
- | A AS A.symbol = 'a'
- |) AS T
- |""".stripMargin
-
- streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
- }
-
- @Test
- def testValidatingAmbiguousColumns(): Unit = {
- thrown.expectMessage("Columns ambiguously defined: {symbol, price}")
- thrown.expect(classOf[ValidationException])
-
- val sqlQuery =
- s"""
- |SELECT *
- |FROM Ticker
- |MATCH_RECOGNIZE (
- | PARTITION BY symbol, price
- | ORDER BY proctime
- | MEASURES
- | A.symbol AS symbol,
- | A.price AS price
- | PATTERN (A)
- | DEFINE
- | A AS symbol = 'a'
- |) AS T
- |""".stripMargin
-
- streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
- }
-
- // ***************************************************************************************
- // * Those validations are temporary. We should remove those tests once we support those *
- // * features. *
- // ***************************************************************************************
-
- /** Python Function can not be used in MATCH_RECOGNIZE for now * */
- @Test
- def testMatchPythonFunction() = {
- thrown.expectMessage("Python Function can not be used in MATCH_RECOGNIZE for now.")
- thrown.expect(classOf[TableException])
-
- streamUtil.addFunction("pyFunc", new PythonScalarFunction("pyFunc"))
- val sql =
- """SELECT T.aa as ta
- |FROM MyTable
- |MATCH_RECOGNIZE (
- | ORDER BY proctime
- | MEASURES
- | A.a as aa,
- | pyFunc(1,2) as bb
- | PATTERN (A B)
- | DEFINE
- | A AS a = 1,
- | B AS b = 'b'
- |) AS T""".stripMargin
- streamUtil.verifyExplain(sql)
- }
-
- @Test
- def testAllRowsPerMatch(): Unit = {
- thrown.expectMessage("All rows per match mode is not supported yet.")
- thrown.expect(classOf[TableException])
-
- val sqlQuery =
- s"""
- |SELECT *
- |FROM Ticker
- |MATCH_RECOGNIZE (
- | ORDER BY proctime
- | MEASURES
- | A.symbol AS aSymbol
- | ALL ROWS PER MATCH
- | PATTERN (A B)
- | DEFINE
- | A AS symbol = 'a'
- |) AS T
- |""".stripMargin
-
- streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
- }
-
- @Test
- def testGreedyQuantifierAtTheEndIsNotSupported(): Unit = {
- thrown.expectMessage(
- "Greedy quantifiers are not allowed as the last element of a " +
- "Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier.")
- thrown.expect(classOf[TableException])
-
- val sqlQuery =
- s"""
- |SELECT *
- |FROM Ticker
- |MATCH_RECOGNIZE (
- | ORDER BY proctime
- | MEASURES
- | A.symbol AS aSymbol
- | PATTERN (A B+)
- | DEFINE
- | A AS symbol = 'a'
- |) AS T
- |""".stripMargin
-
- streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
- }
-
- @Test
- def testPatternsProducingEmptyMatchesAreNotSupported(): Unit = {
- thrown.expectMessage(
- "Patterns that can produce empty matches are not supported. " +
- "There must be at least one non-optional state.")
- thrown.expect(classOf[TableException])
-
- val sqlQuery =
- s"""
- |SELECT *
- |FROM Ticker
- |MATCH_RECOGNIZE (
- | ORDER BY proctime
- | MEASURES
- | A.symbol AS aSymbol
- | PATTERN (A*)
- | DEFINE
- | A AS symbol = 'a'
- |) AS T
- |""".stripMargin
-
- streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
- }
-
- @Test
- def testDistinctAggregationsNotSupported(): Unit = {
- thrown.expect(classOf[ValidationException])
-
- val sqlQuery =
- s"""
- |SELECT *
- |FROM Ticker
- |MATCH_RECOGNIZE (
- | ORDER BY proctime
- | MEASURES
- | COUNT(DISTINCT A.price) AS price
- | PATTERN (A B)
- | DEFINE
- | A AS A.symbol = 'a'
- |) AS T
- |""".stripMargin
-
- streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
- }
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
index e62f9fb6b5b..ac4db52febc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.data.RowData
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl
import org.apache.flink.table.planner.delegation.PlannerBase
-import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDataStreamScan, StreamPhysicalMatch}
import org.apache.flink.table.planner.plan.utils.MatchUtil
import org.apache.flink.table.planner.utils.TableTestUtil
@@ -105,9 +105,9 @@ abstract class PatternTranslatorTestBase extends TestLogger {
}
val dataMatch = optimized.asInstanceOf[StreamPhysicalMatch]
- val p = StreamExecMatch
+ val p = CommonExecMatch
.translatePattern(
- MatchUtil.createMatchSpec(dataMatch.logicalMatch),
+ MatchUtil.createMatchSpec(dataMatch.getLogicalMatch),
new Configuration,
Thread.currentThread().getContextClassLoader,
context._1,