You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/06/09 02:11:15 UTC

[flink] branch master updated: [FLINK-24865][CEP] Support MATCH_RECOGNIZE in Batch mode

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f7bee70e3a [FLINK-24865][CEP] Support MATCH_RECOGNIZE in Batch mode
7f7bee70e3a is described below

commit 7f7bee70e3ac0d9fb27d7e09b41d6396b748dada
Author: SteNicholas <pr...@163.com>
AuthorDate: Wed Jun 8 14:19:58 2022 +0800

    [FLINK-24865][CEP] Support MATCH_RECOGNIZE in Batch mode
    
    This closes #18408
---
 .../plan/nodes/exec/batch/BatchExecMatch.java      |  57 ++
 .../CommonExecMatch.java}                          | 124 +---
 .../plan/nodes/exec/stream/StreamExecMatch.java    | 387 +------------
 .../nodes/physical/batch/BatchPhysicalMatch.java   |  65 +++
 .../nodes/physical/common/CommonPhysicalMatch.java | 101 ++++
 .../physical/batch/BatchPhysicalMatchRule.java     |  63 ++
 .../physical/common/CommonPhysicalMatchRule.java   | 176 ++++++
 .../physical/stream/StreamPhysicalMatch.scala      |  50 +-
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |   3 +
 .../physical/stream/StreamPhysicalMatchRule.scala  | 132 +----
 .../validation/MatchRecognizeValidationTest.java   | 354 +++++++++++
 .../planner/plan/batch/sql/MatchRecognizeTest.java |  87 +++
 .../nodes/exec/operator/BatchOperatorNameTest.java |  20 +
 .../runtime/batch/sql/MatchRecognizeITCase.java    | 645 +++++++++++++++++++++
 .../planner/plan/batch/sql/MatchRecognizeTest.xml  |  74 +++
 .../nodes/exec/operator/BatchOperatorNameTest.xml  | 108 ++++
 .../validation/MatchRecognizeValidationTest.scala  | 347 -----------
 .../planner/match/PatternTranslatorTestBase.scala  |   6 +-
 18 files changed, 1802 insertions(+), 997 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
new file mode 100644
index 00000000000..4f02be5f91c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+
+/** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
+public class BatchExecMatch extends CommonExecMatch
+        implements BatchExecNode<RowData>, MultipleTransformationTranslator<RowData> {
+
+    public BatchExecMatch(
+            ReadableConfig tableConfig,
+            MatchSpec matchSpec,
+            InputProperty inputProperty,
+            RowType outputType,
+            String description) {
+        super(
+                ExecNodeContext.newNodeId(),
+                ExecNodeContext.newContext(BatchExecMatch.class),
+                ExecNodeContext.newPersistedConfig(BatchExecMatch.class, tableConfig),
+                matchSpec,
+                Collections.singletonList(inputProperty),
+                outputType,
+                description);
+    }
+
+    @Override
+    public boolean isProcTime(RowType inputRowType) {
+        return true;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
similarity index 78%
copy from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
index 6d1dbad480f..d961b5a4c96 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
 
-import org.apache.flink.FlinkVersion;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.dag.Transformation;
@@ -46,7 +45,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
@@ -59,16 +57,11 @@ import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
 import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
-import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.MathUtils;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
@@ -79,61 +72,29 @@ import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.tools.RelBuilder;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** Stream {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
-@ExecNodeMetadata(
-        name = "stream-exec-match",
-        version = 1,
-        producedTransformations = {
-            StreamExecMatch.TIMESTAMP_INSERTER_TRANSFORMATION,
-            StreamExecMatch.MATCH_TRANSFORMATION
-        },
-        minPlanVersion = FlinkVersion.v1_15,
-        minStateVersion = FlinkVersion.v1_15)
-public class StreamExecMatch extends ExecNodeBase<RowData>
-        implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> {
-
-    public static final String TIMESTAMP_INSERTER_TRANSFORMATION = "timestamp-inserter";
-    public static final String MATCH_TRANSFORMATION = "match";
+/** Common {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
+public abstract class CommonExecMatch extends ExecNodeBase<RowData>
+        implements ExecNode<RowData>, MultipleTransformationTranslator<RowData> {
 
-    public static final String FIELD_NAME_MATCH_SPEC = "matchSpec";
+    public static final String MATCH_TRANSFORMATION = "match";
 
-    @JsonProperty(FIELD_NAME_MATCH_SPEC)
     private final MatchSpec matchSpec;
 
-    public StreamExecMatch(
-            ReadableConfig tableConfig,
+    public CommonExecMatch(
+            int id,
+            ExecNodeContext context,
+            ReadableConfig persistedConfig,
             MatchSpec matchSpec,
-            InputProperty inputProperty,
-            RowType outputType,
+            List<InputProperty> inputProperties,
+            LogicalType outputType,
             String description) {
-        this(
-                ExecNodeContext.newNodeId(),
-                ExecNodeContext.newContext(StreamExecMatch.class),
-                ExecNodeContext.newPersistedConfig(StreamExecMatch.class, tableConfig),
-                matchSpec,
-                Collections.singletonList(inputProperty),
-                outputType,
-                description);
-    }
-
-    @JsonCreator
-    public StreamExecMatch(
-            @JsonProperty(FIELD_NAME_ID) int id,
-            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
-            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
-            @JsonProperty(FIELD_NAME_MATCH_SPEC) MatchSpec matchSpec,
-            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
-            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
-            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
         super(id, context, persistedConfig, inputProperties, outputType, description);
         checkArgument(inputProperties.size() == 1);
         this.matchSpec = checkNotNull(matchSpec);
@@ -182,11 +143,6 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
         }
 
         final int[] partitionKeys = matchSpec.getPartition().getFieldIndices();
-        final SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
-        final LogicalType timeOrderFieldType =
-                inputRowType.getTypeAt(timeOrderField.getFieldIndex());
-
-        final boolean isProctime = TypeCheckUtils.isProcTime(timeOrderFieldType);
         final InternalTypeInfo<RowData> inputTypeInfo =
                 (InternalTypeInfo<RowData>) inputTransform.getOutputType();
         final TypeSerializer<RowData> inputSerializer =
@@ -212,7 +168,7 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
         final CepOperator<RowData, RowData, RowData> operator =
                 new CepOperator<>(
                         inputSerializer,
-                        isProctime,
+                        isProcTime(inputRowType),
                         nfaFactory,
                         eventComparator,
                         cepPattern.getAfterMatchSkipStrategy(),
@@ -238,28 +194,7 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
         return transform;
     }
 
-    private void checkOrderKeys(RowType inputRowType) {
-        SortSpec orderKeys = matchSpec.getOrderKeys();
-        if (orderKeys.getFieldSize() == 0) {
-            throw new TableException("You must specify either rowtime or proctime for order by.");
-        }
-
-        SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0);
-        int timeOrderFieldIdx = timeOrderField.getFieldIndex();
-        LogicalType timeOrderFieldType = inputRowType.getTypeAt(timeOrderFieldIdx);
-        // need to identify time between others order fields. Time needs to be first sort element
-        if (!TypeCheckUtils.isRowTime(timeOrderFieldType)
-                && !TypeCheckUtils.isProcTime(timeOrderFieldType)) {
-            throw new TableException(
-                    "You must specify either rowtime or proctime for order by as the first one.");
-        }
-
-        // time ordering needs to be ascending
-        if (!orderKeys.getAscendingOrders()[0]) {
-            throw new TableException(
-                    "Primary sort order of a streaming table must be ascending on time.");
-        }
-    }
+    protected void checkOrderKeys(RowType inputRowType) {}
 
     private EventComparator<RowData> createEventComparator(
             ExecNodeConfig config, ClassLoader classLoader, RowType inputRowType) {
@@ -274,36 +209,9 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
         }
     }
 
-    private Transformation<RowData> translateOrder(
+    protected Transformation<RowData> translateOrder(
             Transformation<RowData> inputTransform, RowType inputRowType, ReadableConfig config) {
-        SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
-        int timeOrderFieldIdx = timeOrderField.getFieldIndex();
-        LogicalType timeOrderFieldType = inputRowType.getTypeAt(timeOrderFieldIdx);
-
-        if (TypeCheckUtils.isRowTime(timeOrderFieldType)) {
-            // copy the rowtime field into the StreamRecord timestamp field
-            int precision = getPrecision(timeOrderFieldType);
-            Transformation<RowData> transform =
-                    ExecNodeUtil.createOneInputTransformation(
-                            inputTransform,
-                            createTransformationMeta(
-                                    TIMESTAMP_INSERTER_TRANSFORMATION,
-                                    String.format(
-                                            "StreamRecordTimestampInserter(rowtime field: %s)",
-                                            timeOrderFieldIdx),
-                                    "StreamRecordTimestampInserter",
-                                    config),
-                            new StreamRecordTimestampInserter(timeOrderFieldIdx, precision),
-                            inputTransform.getOutputType(),
-                            inputTransform.getParallelism());
-            if (inputsContainSingleton()) {
-                transform.setParallelism(1);
-                transform.setMaxParallelism(1);
-            }
-            return transform;
-        } else {
-            return inputTransform;
-        }
+        return inputTransform;
     }
 
     @VisibleForTesting
@@ -337,6 +245,8 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
                 "Only constant intervals with millisecond resolution are supported as time constraints of patterns.");
     }
 
+    public abstract boolean isProcTime(RowType inputRowType);
+
     /** The visitor to traverse the pattern RexNode. */
     private static class PatternVisitor extends RexDefaultVisitor<Pattern<RowData, RowData>> {
         private final ReadableConfig config;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
index 6d1dbad480f..6bc018528d8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
@@ -19,74 +19,31 @@
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.flink.FlinkVersion;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.operator.CepOperator;
-import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.cep.pattern.Quantifier;
-import org.apache.flink.cep.pattern.conditions.BooleanConditions;
-import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.codegen.CodeGenUtils;
-import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
-import org.apache.flink.table.planner.codegen.MatchCodeGenerator;
-import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator;
-import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
-import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
-import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
-import org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
-import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
 import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.MathUtils;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlMatchRecognize;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.type.SqlTypeFamily;
-import org.apache.calcite.tools.RelBuilder;
-
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Optional;
 
 import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Stream {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
 @ExecNodeMetadata(
@@ -98,11 +55,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
         },
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
-public class StreamExecMatch extends ExecNodeBase<RowData>
+public class StreamExecMatch extends CommonExecMatch
         implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> {
 
     public static final String TIMESTAMP_INSERTER_TRANSFORMATION = "timestamp-inserter";
-    public static final String MATCH_TRANSFORMATION = "match";
 
     public static final String FIELD_NAME_MATCH_SPEC = "matchSpec";
 
@@ -134,111 +90,12 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
             @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
             @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
             @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
-        super(id, context, persistedConfig, inputProperties, outputType, description);
-        checkArgument(inputProperties.size() == 1);
-        this.matchSpec = checkNotNull(matchSpec);
+        super(id, context, persistedConfig, matchSpec, inputProperties, outputType, description);
+        this.matchSpec = matchSpec;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
-    protected Transformation<RowData> translateToPlanInternal(
-            PlannerBase planner, ExecNodeConfig config) {
-        final ExecEdge inputEdge = getInputEdges().get(0);
-        final Transformation<RowData> inputTransform =
-                (Transformation<RowData>) inputEdge.translateToPlan(planner);
-        final RowType inputRowType = (RowType) inputEdge.getOutputType();
-
-        checkOrderKeys(inputRowType);
-        final EventComparator<RowData> eventComparator =
-                createEventComparator(
-                        config, planner.getFlinkContext().getClassLoader(), inputRowType);
-        final Transformation<RowData> timestampedInputTransform =
-                translateOrder(inputTransform, inputRowType, config);
-
-        final Tuple2<Pattern<RowData, RowData>, List<String>> cepPatternAndNames =
-                translatePattern(
-                        matchSpec,
-                        config,
-                        planner.getFlinkContext().getClassLoader(),
-                        planner.createRelBuilder(),
-                        inputRowType);
-        final Pattern<RowData, RowData> cepPattern = cepPatternAndNames.f0;
-
-        // TODO remove this once it is supported in CEP library
-        if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
-            throw new TableException(
-                    "Patterns that can produce empty matches are not supported. There must be at least one non-optional state.");
-        }
-
-        // TODO remove this once it is supported in CEP library
-        if (cepPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
-            throw new TableException(
-                    "Greedy quantifiers are not allowed as the last element of a Pattern yet. "
-                            + "Finish your pattern with either a simple variable or reluctant quantifier.");
-        }
-
-        if (matchSpec.isAllRows()) {
-            throw new TableException("All rows per match mode is not supported yet.");
-        }
-
-        final int[] partitionKeys = matchSpec.getPartition().getFieldIndices();
-        final SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
-        final LogicalType timeOrderFieldType =
-                inputRowType.getTypeAt(timeOrderField.getFieldIndex());
-
-        final boolean isProctime = TypeCheckUtils.isProcTime(timeOrderFieldType);
-        final InternalTypeInfo<RowData> inputTypeInfo =
-                (InternalTypeInfo<RowData>) inputTransform.getOutputType();
-        final TypeSerializer<RowData> inputSerializer =
-                inputTypeInfo.createSerializer(planner.getExecEnv().getConfig());
-        final NFACompiler.NFAFactory<RowData> nfaFactory =
-                NFACompiler.compileFactory(cepPattern, false);
-        final MatchCodeGenerator generator =
-                new MatchCodeGenerator(
-                        new CodeGeneratorContext(
-                                config, planner.getFlinkContext().getClassLoader()),
-                        planner.createRelBuilder(),
-                        false, // nullableInput
-                        JavaScalaConversionUtil.toScala(cepPatternAndNames.f1),
-                        JavaScalaConversionUtil.toScala(Optional.empty()),
-                        CodeGenUtils.DEFAULT_COLLECTOR_TERM());
-        generator.bindInput(
-                inputRowType,
-                CodeGenUtils.DEFAULT_INPUT1_TERM(),
-                JavaScalaConversionUtil.toScala(Optional.empty()));
-        final PatternProcessFunctionRunner patternProcessFunction =
-                generator.generateOneRowPerMatchExpression(
-                        (RowType) getOutputType(), partitionKeys, matchSpec.getMeasures());
-        final CepOperator<RowData, RowData, RowData> operator =
-                new CepOperator<>(
-                        inputSerializer,
-                        isProctime,
-                        nfaFactory,
-                        eventComparator,
-                        cepPattern.getAfterMatchSkipStrategy(),
-                        patternProcessFunction,
-                        null);
-        final OneInputTransformation<RowData, RowData> transform =
-                ExecNodeUtil.createOneInputTransformation(
-                        timestampedInputTransform,
-                        createTransformationMeta(MATCH_TRANSFORMATION, config),
-                        operator,
-                        InternalTypeInfo.of(getOutputType()),
-                        timestampedInputTransform.getParallelism());
-        final RowDataKeySelector selector =
-                KeySelectorUtil.getRowDataSelector(
-                        planner.getFlinkContext().getClassLoader(), partitionKeys, inputTypeInfo);
-        transform.setStateKeySelector(selector);
-        transform.setStateKeyType(selector.getProducedType());
-
-        if (inputsContainSingleton()) {
-            transform.setParallelism(1);
-            transform.setMaxParallelism(1);
-        }
-        return transform;
-    }
-
-    private void checkOrderKeys(RowType inputRowType) {
+    public void checkOrderKeys(RowType inputRowType) {
         SortSpec orderKeys = matchSpec.getOrderKeys();
         if (orderKeys.getFieldSize() == 0) {
             throw new TableException("You must specify either rowtime or proctime for order by.");
@@ -261,20 +118,8 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
         }
     }
 
-    private EventComparator<RowData> createEventComparator(
-            ExecNodeConfig config, ClassLoader classLoader, RowType inputRowType) {
-        SortSpec orderKeys = matchSpec.getOrderKeys();
-        if (orderKeys.getFieldIndices().length > 1) {
-            GeneratedRecordComparator rowComparator =
-                    ComparatorCodeGenerator.gen(
-                            config, classLoader, "RowDataComparator", inputRowType, orderKeys);
-            return new RowDataEventComparator(rowComparator);
-        } else {
-            return null;
-        }
-    }
-
-    private Transformation<RowData> translateOrder(
+    @Override
+    public Transformation<RowData> translateOrder(
             Transformation<RowData> inputTransform, RowType inputRowType, ReadableConfig config) {
         SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
         int timeOrderFieldIdx = timeOrderField.getFieldIndex();
@@ -306,217 +151,11 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
         }
     }
 
-    @VisibleForTesting
-    public static Tuple2<Pattern<RowData, RowData>, List<String>> translatePattern(
-            MatchSpec matchSpec,
-            ReadableConfig config,
-            ClassLoader classLoader,
-            RelBuilder relBuilder,
-            RowType inputRowType) {
-        final PatternVisitor patternVisitor =
-                new PatternVisitor(config, classLoader, relBuilder, inputRowType, matchSpec);
-
-        final Pattern<RowData, RowData> cepPattern;
-        if (matchSpec.getInterval().isPresent()) {
-            Time interval = translateTimeBound(matchSpec.getInterval().get());
-            cepPattern = matchSpec.getPattern().accept(patternVisitor).within(interval);
-        } else {
-            cepPattern = matchSpec.getPattern().accept(patternVisitor);
-        }
-        return new Tuple2<>(cepPattern, new ArrayList<>(patternVisitor.names));
-    }
-
-    private static Time translateTimeBound(RexNode interval) {
-        if (interval instanceof RexLiteral) {
-            final RexLiteral l = (RexLiteral) interval;
-            if (l.getTypeName().getFamily() == SqlTypeFamily.INTERVAL_DAY_TIME) {
-                return Time.milliseconds(l.getValueAs(Long.class));
-            }
-        }
-        throw new TableException(
-                "Only constant intervals with millisecond resolution are supported as time constraints of patterns.");
-    }
-
-    /** The visitor to traverse the pattern RexNode. */
-    private static class PatternVisitor extends RexDefaultVisitor<Pattern<RowData, RowData>> {
-        private final ReadableConfig config;
-        private final ClassLoader classLoader;
-        private final RelBuilder relBuilder;
-        private final RowType inputRowType;
-        private final MatchSpec matchSpec;
-        private final LinkedHashSet<String> names;
-        private Pattern<RowData, RowData> pattern;
-
-        public PatternVisitor(
-                ReadableConfig config,
-                ClassLoader classLoader,
-                RelBuilder relBuilder,
-                RowType inputRowType,
-                MatchSpec matchSpec) {
-            this.config = config;
-            this.classLoader = classLoader;
-            this.relBuilder = relBuilder;
-            this.inputRowType = inputRowType;
-            this.matchSpec = matchSpec;
-            this.names = new LinkedHashSet<>();
-        }
-
-        @Override
-        public Pattern<RowData, RowData> visitLiteral(RexLiteral literal) {
-            String patternName = literal.getValueAs(String.class);
-            pattern = translateSingleVariable(pattern, patternName);
-
-            RexNode patternDefinition = matchSpec.getPatternDefinitions().get(patternName);
-            if (patternDefinition != null) {
-                MatchCodeGenerator generator =
-                        new MatchCodeGenerator(
-                                new CodeGeneratorContext(config, classLoader),
-                                relBuilder,
-                                false, // nullableInput
-                                JavaScalaConversionUtil.toScala(new ArrayList<>(names)),
-                                JavaScalaConversionUtil.toScala(Optional.of(patternName)),
-                                CodeGenUtils.DEFAULT_COLLECTOR_TERM());
-                generator.bindInput(
-                        inputRowType,
-                        CodeGenUtils.DEFAULT_INPUT1_TERM(),
-                        JavaScalaConversionUtil.toScala(Optional.empty()));
-                IterativeCondition<RowData> condition =
-                        generator.generateIterativeCondition(patternDefinition);
-                return pattern.where(condition);
-            } else {
-                return pattern.where(BooleanConditions.trueFunction());
-            }
-        }
-
-        @Override
-        public Pattern<RowData, RowData> visitCall(RexCall call) {
-            SqlOperator operator = call.getOperator();
-            if (operator == SqlStdOperatorTable.PATTERN_CONCAT) {
-                pattern = call.operands.get(0).accept(this);
-                pattern = call.operands.get(1).accept(this);
-                return pattern;
-            } else if (operator == SqlStdOperatorTable.PATTERN_QUANTIFIER) {
-                final RexLiteral name;
-                if (call.operands.get(0) instanceof RexLiteral) {
-                    name = (RexLiteral) call.operands.get(0);
-                } else {
-                    throw new TableException(
-                            String.format(
-                                    "Expression not supported: %s Group patterns are not supported yet.",
-                                    call.operands.get(0)));
-                }
-
-                pattern = name.accept(this);
-                int startNum =
-                        MathUtils.checkedDownCast(
-                                ((RexLiteral) call.operands.get(1)).getValueAs(Long.class));
-                int endNum =
-                        MathUtils.checkedDownCast(
-                                ((RexLiteral) call.operands.get(2)).getValueAs(Long.class));
-                boolean isGreedy = !((RexLiteral) call.operands.get(3)).getValueAs(Boolean.class);
-
-                return applyQuantifier(pattern, startNum, endNum, isGreedy);
-            } else if (operator == SqlStdOperatorTable.PATTERN_ALTER) {
-                throw new TableException(
-                        String.format(
-                                "Expression not supported: %s. Currently, CEP doesn't support branching patterns.",
-                                call));
-            } else if (operator == SqlStdOperatorTable.PATTERN_PERMUTE) {
-                throw new TableException(
-                        String.format(
-                                "Expression not supported: %s. Currently, CEP doesn't support PERMUTE patterns.",
-                                call));
-            } else if (operator == SqlStdOperatorTable.PATTERN_EXCLUDE) {
-                throw new TableException(
-                        String.format(
-                                "Expression not supported: %s. Currently, CEP doesn't support '{-' '-}' patterns.",
-                                call));
-            } else {
-                throw new TableException("This should not happen.");
-            }
-        }
-
-        @Override
-        public Pattern<RowData, RowData> visitNode(RexNode rexNode) {
-            throw new TableException(
-                    String.format("Unsupported expression within Pattern: [%s]", rexNode));
-        }
-
-        private Pattern<RowData, RowData> translateSingleVariable(
-                Pattern<RowData, RowData> previousPattern, String patternName) {
-            if (names.contains(patternName)) {
-                throw new TableException(
-                        "Pattern variables must be unique. That might change in the future.");
-            } else {
-                names.add(patternName);
-            }
-
-            if (previousPattern != null) {
-                return previousPattern.next(patternName);
-            } else {
-                return Pattern.begin(patternName, translateSkipStrategy());
-            }
-        }
-
-        private AfterMatchSkipStrategy translateSkipStrategy() {
-            switch (matchSpec.getAfter().getKind()) {
-                case LITERAL:
-                    SqlMatchRecognize.AfterOption afterOption =
-                            ((RexLiteral) matchSpec.getAfter())
-                                    .getValueAs(SqlMatchRecognize.AfterOption.class);
-                    switch (afterOption) {
-                        case SKIP_PAST_LAST_ROW:
-                            return AfterMatchSkipStrategy.skipPastLastEvent();
-                        case SKIP_TO_NEXT_ROW:
-                            return AfterMatchSkipStrategy.skipToNext();
-                        default:
-                            throw new TableException("This should not happen.");
-                    }
-                case SKIP_TO_FIRST:
-                    return AfterMatchSkipStrategy.skipToFirst(getPatternTarget())
-                            .throwExceptionOnMiss();
-                case SKIP_TO_LAST:
-                    return AfterMatchSkipStrategy.skipToLast(getPatternTarget())
-                            .throwExceptionOnMiss();
-                default:
-                    throw new TableException(
-                            String.format(
-                                    "Corrupted query tree. Unexpected %s for after match strategy.",
-                                    matchSpec.getAfter()));
-            }
-        }
-
-        private String getPatternTarget() {
-            return ((RexLiteral) ((RexCall) matchSpec.getAfter()).getOperands().get(0))
-                    .getValueAs(String.class);
-        }
-
-        private Pattern<RowData, RowData> applyQuantifier(
-                Pattern<RowData, RowData> pattern, int startNum, int endNum, boolean greedy) {
-            boolean isOptional = startNum == 0 && endNum == 1;
-
-            final Pattern<RowData, RowData> newPattern;
-            if (startNum == 0 && endNum == -1) { // zero or more
-                newPattern = pattern.oneOrMore().optional().consecutive();
-            } else if (startNum == 1 && endNum == -1) { // one or more
-                newPattern = pattern.oneOrMore().consecutive();
-            } else if (isOptional) { // optional
-                newPattern = pattern.optional();
-            } else if (endNum != -1) { // times
-                newPattern = pattern.times(startNum, endNum).consecutive();
-            } else { // times or more
-                newPattern = pattern.timesOrMore(startNum).consecutive();
-            }
-
-            if (greedy && (isOptional || startNum == endNum)) {
-                return newPattern;
-            } else if (greedy) {
-                return newPattern.greedy();
-            } else if (isOptional) {
-                throw new TableException("Reluctant optional variables are not supported yet.");
-            } else {
-                return newPattern;
-            }
-        }
+    @Override
+    public boolean isProcTime(RowType inputRowType) {
+        final SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
+        final LogicalType timeOrderFieldType =
+                inputRowType.getTypeAt(timeOrderField.getFieldIndex());
+        return TypeCheckUtils.isProcTime(timeOrderFieldType);
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java
new file mode 100644
index 00000000000..2f57564ef8a
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.logical.MatchRecognize;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMatch;
+import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalMatch;
+import org.apache.flink.table.planner.plan.utils.MatchUtil;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+
+import java.util.List;
+
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
+
+/** Batch physical RelNode which matches along with MATCH_RECOGNIZE. */
+public class BatchPhysicalMatch extends CommonPhysicalMatch implements BatchPhysicalRel {
+
+    public BatchPhysicalMatch(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            RelNode inputNode,
+            MatchRecognize logicalMatch,
+            RelDataType outputRowType) {
+        super(cluster, traitSet, inputNode, logicalMatch, outputRowType);
+    }
+
+    @Override
+    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new BatchPhysicalMatch(
+                getCluster(), traitSet, inputs.get(0), getLogicalMatch(), deriveRowType());
+    }
+
+    @Override
+    public ExecNode<?> translateToExecNode() {
+        return new BatchExecMatch(
+                unwrapTableConfig(this),
+                MatchUtil.createMatchSpec(getLogicalMatch()),
+                InputProperty.DEFAULT,
+                FlinkTypeFactory.toLogicalRowType(getRowType()),
+                getRelDetailedDescription());
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalMatch.java
new file mode 100644
index 00000000000..fc95986df40
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalMatch.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.common;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.plan.logical.MatchRecognize;
+import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.type.RelDataType;
+
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+/** Base physical RelNode which matches along with MATCH_RECOGNIZE. */
+public abstract class CommonPhysicalMatch extends SingleRel implements FlinkPhysicalRel {
+
+    private final MatchRecognize logicalMatch;
+    private final RelDataType outputRowType;
+
+    public CommonPhysicalMatch(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            RelNode inputNode,
+            MatchRecognize logicalMatch,
+            RelDataType outputRowType) {
+        super(cluster, traitSet, inputNode);
+        if (logicalMatch.measures().values().stream()
+                        .anyMatch(m -> PythonUtil.containsPythonCall(m, null))
+                || logicalMatch.patternDefinitions().values().stream()
+                        .anyMatch(p -> PythonUtil.containsPythonCall(p, null))) {
+            throw new TableException("Python Function can not be used in MATCH_RECOGNIZE for now.");
+        }
+        this.logicalMatch = logicalMatch;
+        this.outputRowType = outputRowType;
+    }
+
+    @Override
+    protected RelDataType deriveRowType() {
+        return outputRowType;
+    }
+
+    @Override
+    public RelWriter explainTerms(RelWriter pw) {
+        RelDataType inputRowType = getInput().getRowType();
+        Seq<String> fieldNames =
+                JavaConverters.asScalaBufferConverter(inputRowType.getFieldNames()).asScala();
+        return super.explainTerms(pw)
+                .itemIf(
+                        "partitionBy",
+                        RelExplainUtil.fieldToString(
+                                logicalMatch.partitionKeys().toArray(), inputRowType),
+                        !logicalMatch.partitionKeys().isEmpty())
+                .itemIf(
+                        "orderBy",
+                        RelExplainUtil.collationToString(logicalMatch.orderKeys(), inputRowType),
+                        !logicalMatch.orderKeys().getFieldCollations().isEmpty())
+                .itemIf(
+                        "measures",
+                        RelExplainUtil.measuresDefineToString(
+                                logicalMatch.measures(),
+                                fieldNames.toList(),
+                                this::getExpressionString,
+                                convertToExpressionDetail(pw.getDetailLevel())),
+                        !logicalMatch.measures().isEmpty())
+                .item("rowsPerMatch", RelExplainUtil.rowsPerMatchToString(logicalMatch.allRows()))
+                .item("after", RelExplainUtil.afterMatchToString(logicalMatch.after(), fieldNames))
+                .item("pattern", logicalMatch.pattern().toString())
+                .itemIf(
+                        "subset",
+                        RelExplainUtil.subsetToString(logicalMatch.subsets()),
+                        !logicalMatch.subsets().isEmpty())
+                .item("define", logicalMatch.patternDefinitions());
+    }
+
+    public MatchRecognize getLogicalMatch() {
+        return logicalMatch;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.java
new file mode 100644
index 00000000000..84a74407e44
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.logical.MatchRecognize;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalMatch;
+import org.apache.flink.table.planner.plan.rules.physical.common.CommonPhysicalMatchRule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+
+/**
+ * The physical rule is responsible for convert {@link FlinkLogicalMatch} to {@link
+ * BatchPhysicalMatch}.
+ */
+public class BatchPhysicalMatchRule extends CommonPhysicalMatchRule {
+
+    public static final RelOptRule INSTANCE = new BatchPhysicalMatchRule();
+
+    private BatchPhysicalMatchRule() {
+        super(
+                FlinkLogicalMatch.class,
+                FlinkConventions.LOGICAL(),
+                FlinkConventions.BATCH_PHYSICAL(),
+                "BatchPhysicalMatchRule");
+    }
+
+    @Override
+    public RelNode convert(RelNode rel) {
+        return super.convert(rel, FlinkConventions.BATCH_PHYSICAL());
+    }
+
+    @Override
+    protected RelNode convertToPhysicalMatch(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            RelNode convertInput,
+            MatchRecognize matchRecognize,
+            RelDataType rowType) {
+        return new BatchPhysicalMatch(cluster, traitSet, convertInput, matchRecognize, rowType);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
new file mode 100644
index 00000000000..397476b0ebb
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.common;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.plan.logical.MatchRecognize;
+import org.apache.flink.table.planner.plan.nodes.FlinkConvention;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.MatchUtil.AggregationPatternVariableFinder;
+import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The physical rule is responsible for converting {@link FlinkLogicalMatch} to physical Match rel.
+ */
+public abstract class CommonPhysicalMatchRule extends ConverterRule {
+
+    public CommonPhysicalMatchRule(
+            Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String descriptionPrefix) {
+        super(clazz, in, out, descriptionPrefix);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        FlinkLogicalMatch logicalMatch = call.rel(0);
+
+        validateAggregations(logicalMatch.getMeasures().values());
+        validateAggregations(logicalMatch.getPatternDefinitions().values());
+        // This check might be obsolete once CALCITE-2747 is resolved
+        validateAmbiguousColumns(logicalMatch);
+        return true;
+    }
+
+    public RelNode convert(RelNode rel, FlinkConvention convention) {
+        FlinkLogicalMatch logicalMatch = (FlinkLogicalMatch) rel;
+        RelTraitSet traitSet = rel.getTraitSet().replace(convention);
+        ImmutableBitSet partitionKeys = logicalMatch.getPartitionKeys();
+
+        FlinkRelDistribution requiredDistribution =
+                partitionKeys.isEmpty()
+                        ? FlinkRelDistribution.SINGLETON()
+                        : FlinkRelDistribution.hash(logicalMatch.getPartitionKeys().asList(), true);
+        RelTraitSet requiredTraitSet =
+                rel.getCluster()
+                        .getPlanner()
+                        .emptyTraitSet()
+                        .replace(requiredDistribution)
+                        .replace(convention);
+
+        RelNode convertInput = RelOptRule.convert(logicalMatch.getInput(), requiredTraitSet);
+
+        try {
+            Class.forName(
+                    "org.apache.flink.cep.pattern.Pattern",
+                    false,
+                    Thread.currentThread().getContextClassLoader());
+        } catch (ClassNotFoundException e) {
+            throw new TableException(
+                    "MATCH RECOGNIZE clause requires flink-cep dependency to be present on the classpath.",
+                    e);
+        }
+        return convertToPhysicalMatch(
+                rel.getCluster(),
+                traitSet,
+                convertInput,
+                new MatchRecognize(
+                        logicalMatch.getPattern(),
+                        logicalMatch.getPatternDefinitions(),
+                        logicalMatch.getMeasures(),
+                        logicalMatch.getAfter(),
+                        logicalMatch.getSubsets(),
+                        logicalMatch.isAllRows(),
+                        logicalMatch.getPartitionKeys(),
+                        logicalMatch.getOrderKeys(),
+                        logicalMatch.getInterval()),
+                logicalMatch.getRowType());
+    }
+
+    protected abstract RelNode convertToPhysicalMatch(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            RelNode convertInput,
+            MatchRecognize matchRecognize,
+            RelDataType rowType);
+
+    private void validateAggregations(Iterable<RexNode> expr) {
+        AggregationsValidator validator = new AggregationsValidator();
+        expr.forEach(e -> e.accept(validator));
+    }
+
+    private void validateAmbiguousColumns(FlinkLogicalMatch logicalMatch) {
+        if (logicalMatch.isAllRows()) {
+            throw new TableException("All rows per match mode is not supported yet.");
+        } else {
+            validateAmbiguousColumnsOnRowPerMatch(
+                    logicalMatch.getPartitionKeys(),
+                    logicalMatch.getMeasures().keySet(),
+                    logicalMatch.getInput().getRowType(),
+                    logicalMatch.getRowType());
+        }
+    }
+
+    private void validateAmbiguousColumnsOnRowPerMatch(
+            ImmutableBitSet partitionKeys,
+            Set<String> measuresNames,
+            RelDataType inputSchema,
+            RelDataType expectedSchema) {
+        int actualSize = partitionKeys.toArray().length + measuresNames.size();
+        int expectedSize = expectedSchema.getFieldCount();
+        if (actualSize != expectedSize) {
+            // try to find ambiguous column
+
+            String ambiguousColumns =
+                    Arrays.stream(partitionKeys.toArray())
+                            .mapToObj(k -> inputSchema.getFieldList().get(k).getName())
+                            .filter(measuresNames::contains)
+                            .collect(Collectors.joining(", ", "{", "}"));
+
+            throw new ValidationException(
+                    String.format("Columns ambiguously defined: %s", ambiguousColumns));
+        }
+    }
+
+    private static class AggregationsValidator extends RexDefaultVisitor<Object> {
+
+        @Override
+        public Object visitCall(RexCall call) {
+            SqlOperator operator = call.getOperator();
+            if (operator instanceof SqlAggFunction) {
+                call.accept(new AggregationPatternVariableFinder());
+            } else {
+                call.getOperands().forEach(o -> o.accept(this));
+            }
+            return null;
+        }
+
+        @Override
+        public Object visitNode(RexNode rexNode) {
+            return null;
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala
index b97d0c17b61..5a873fb1af6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala
@@ -17,14 +17,12 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
-import org.apache.flink.table.api.TableException
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.logical.MatchRecognize
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch
+import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalMatch
 import org.apache.flink.table.planner.plan.utils.MatchUtil
-import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil._
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 
 import _root_.java.util
@@ -38,59 +36,21 @@ class StreamPhysicalMatch(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputNode: RelNode,
-    val logicalMatch: MatchRecognize,
+    logicalMatch: MatchRecognize,
     outputRowType: RelDataType)
-  extends SingleRel(cluster, traitSet, inputNode)
+  extends CommonPhysicalMatch(cluster, traitSet, inputNode, logicalMatch, outputRowType)
   with StreamPhysicalRel {
 
-  if (
-    logicalMatch.measures.values().exists(containsPythonCall(_)) ||
-    logicalMatch.patternDefinitions.values().exists(containsPythonCall(_))
-  ) {
-    throw new TableException("Python Function can not be used in MATCH_RECOGNIZE for now.")
-  }
-
   override def requireWatermark: Boolean = {
-    val rowtimeFields = getInput.getRowType.getFieldList
+    val rowTimeFields = getInput.getRowType.getFieldList
       .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
-    rowtimeFields.nonEmpty
+    rowTimeFields.nonEmpty
   }
 
-  override def deriveRowType(): RelDataType = outputRowType
-
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
     new StreamPhysicalMatch(cluster, traitSet, inputs.get(0), logicalMatch, outputRowType)
   }
 
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    val inputRowType = getInput.getRowType
-    val fieldNames = inputRowType.getFieldNames.toList
-    super
-      .explainTerms(pw)
-      .itemIf(
-        "partitionBy",
-        fieldToString(logicalMatch.partitionKeys.toArray, inputRowType),
-        !logicalMatch.partitionKeys.isEmpty)
-      .itemIf(
-        "orderBy",
-        collationToString(logicalMatch.orderKeys, inputRowType),
-        !logicalMatch.orderKeys.getFieldCollations.isEmpty)
-      .itemIf(
-        "measures",
-        measuresDefineToString(
-          logicalMatch.measures,
-          fieldNames,
-          getExpressionString,
-          convertToExpressionDetail(pw.getDetailLevel)),
-        !logicalMatch.measures.isEmpty
-      )
-      .item("rowsPerMatch", rowsPerMatchToString(logicalMatch.allRows))
-      .item("after", afterMatchToString(logicalMatch.after, fieldNames))
-      .item("pattern", logicalMatch.pattern.toString)
-      .itemIf("subset", subsetToString(logicalMatch.subsets), !logicalMatch.subsets.isEmpty)
-      .item("define", logicalMatch.patternDefinitions)
-  }
-
   override def translateToExecNode(): ExecNode[_] = {
     new StreamExecMatch(
       unwrapTableConfig(this),
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 22b30f63e4c..30bc6696154 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -321,6 +321,7 @@ object FlinkBatchRuleSets {
     FlinkLogicalRank.CONVERTER,
     FlinkLogicalWindowAggregate.CONVERTER,
     FlinkLogicalSnapshot.CONVERTER,
+    FlinkLogicalMatch.CONVERTER,
     FlinkLogicalSink.CONVERTER,
     FlinkLogicalLegacySink.CONVERTER,
     FlinkLogicalDistribution.BATCH_CONVERTER
@@ -411,6 +412,8 @@ object FlinkBatchRuleSets {
     BatchPhysicalSingleRowJoinRule.INSTANCE,
     BatchPhysicalLookupJoinRule.SNAPSHOT_ON_TABLESCAN,
     BatchPhysicalLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,
+    // CEP
+    BatchPhysicalMatchRule.INSTANCE,
     // correlate
     BatchPhysicalConstantTableFunctionScanRule.INSTANCE,
     BatchPhysicalCorrelateRule.INSTANCE,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala
index 3142fb22bef..baecd116bd9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala
@@ -17,145 +17,35 @@
  */
 package org.apache.flink.table.planner.plan.rules.physical.stream
 
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.planner.plan.logical.MatchRecognize
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch
 import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMatch
-import org.apache.flink.table.planner.plan.utils.{MatchUtil, RexDefaultVisitor}
+import org.apache.flink.table.planner.plan.rules.physical.common.CommonPhysicalMatchRule
 
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.calcite.sql.SqlAggFunction
-import org.apache.calcite.util.ImmutableBitSet
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 class StreamPhysicalMatchRule
-  extends ConverterRule(
+  extends CommonPhysicalMatchRule(
     classOf[FlinkLogicalMatch],
     FlinkConventions.LOGICAL,
     FlinkConventions.STREAM_PHYSICAL,
     "StreamPhysicalMatchRule") {
 
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val logicalMatch: FlinkLogicalMatch = call.rel(0)
-
-    validateAggregations(logicalMatch.getMeasures.values().asScala)
-    validateAggregations(logicalMatch.getPatternDefinitions.values().asScala)
-    // This check might be obsolete once CALCITE-2747 is resolved
-    validateAmbiguousColumns(logicalMatch)
-    true
-  }
-
   override def convert(rel: RelNode): RelNode = {
-    val logicalMatch: FlinkLogicalMatch = rel.asInstanceOf[FlinkLogicalMatch]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-    val partitionKeys = logicalMatch.getPartitionKeys
-
-    val requiredDistribution = if (!partitionKeys.isEmpty) {
-      FlinkRelDistribution.hash(logicalMatch.getPartitionKeys.asList())
-    } else {
-      FlinkRelDistribution.SINGLETON
-    }
-    val requiredTraitSet = rel.getCluster.getPlanner
-      .emptyTraitSet()
-      .replace(requiredDistribution)
-      .replace(FlinkConventions.STREAM_PHYSICAL)
-
-    val convertInput: RelNode =
-      RelOptRule.convert(logicalMatch.getInput, requiredTraitSet)
-
-    try {
-      Class
-        .forName(
-          "org.apache.flink.cep.pattern.Pattern",
-          false,
-          Thread.currentThread().getContextClassLoader)
-    } catch {
-      case ex: ClassNotFoundException =>
-        throw new TableException(
-          "MATCH RECOGNIZE clause requires flink-cep dependency to be present on the classpath.",
-          ex)
-    }
-
-    new StreamPhysicalMatch(
-      rel.getCluster,
-      traitSet,
-      convertInput,
-      MatchRecognize(
-        logicalMatch.getPattern,
-        logicalMatch.getPatternDefinitions,
-        logicalMatch.getMeasures,
-        logicalMatch.getAfter,
-        logicalMatch.getSubsets,
-        logicalMatch.isAllRows,
-        logicalMatch.getPartitionKeys,
-        logicalMatch.getOrderKeys,
-        logicalMatch.getInterval
-      ),
-      logicalMatch.getRowType)
-  }
-
-  private def validateAggregations(expr: Iterable[RexNode]): Unit = {
-    val validator = new AggregationsValidator
-    expr.foreach(_.accept(validator))
+    super.convert(rel, FlinkConventions.STREAM_PHYSICAL)
   }
 
-  private def validateAmbiguousColumns(logicalMatch: FlinkLogicalMatch): Unit = {
-    if (logicalMatch.isAllRows) {
-      throw new TableException("All rows per match mode is not supported yet.")
-    } else {
-      validateAmbiguousColumnsOnRowPerMatch(
-        logicalMatch.getPartitionKeys,
-        logicalMatch.getMeasures.keySet().asScala,
-        logicalMatch.getInput.getRowType,
-        logicalMatch.getRowType)
-    }
+  override def convertToPhysicalMatch(
+      cluster: RelOptCluster,
+      traitSet: RelTraitSet,
+      convertInput: RelNode,
+      matchRecognize: MatchRecognize,
+      rowType: RelDataType): RelNode = {
+    new StreamPhysicalMatch(cluster, traitSet, convertInput, matchRecognize, rowType)
   }
-
-  private def validateAmbiguousColumnsOnRowPerMatch(
-      partitionKeys: ImmutableBitSet,
-      measuresNames: mutable.Set[String],
-      inputSchema: RelDataType,
-      expectedSchema: RelDataType): Unit = {
-    val actualSize = partitionKeys.toArray.length + measuresNames.size
-    val expectedSize = expectedSchema.getFieldCount
-    if (actualSize != expectedSize) {
-      // try to find ambiguous column
-
-      val ambiguousColumns = partitionKeys.toArray
-        .map(inputSchema.getFieldList.get(_).getName)
-        .filter(measuresNames.contains)
-        .mkString("{", ", ", "}")
-
-      throw new ValidationException(s"Columns ambiguously defined: $ambiguousColumns")
-    }
-  }
-
-  private class AggregationsValidator extends RexDefaultVisitor[Object] {
-
-    override def visitCall(call: RexCall): AnyRef = {
-      call.getOperator match {
-        case _: SqlAggFunction =>
-          call.accept(new MatchUtil.AggregationPatternVariableFinder)
-        case _ =>
-          call.getOperands.asScala.foreach(_.accept(this))
-      }
-
-      null
-    }
-
-    override def visitNode(rexNode: RexNode): AnyRef = {
-      null
-    }
-  }
-
 }
 
 object StreamPhysicalMatchRule {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java
new file mode 100644
index 00000000000..cb40481db8d
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg;
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/** Validation test for {@link SqlMatchRecognize}. */
+@RunWith(Parameterized.class)
+public class MatchRecognizeValidationTest extends TableTestBase {
+
+    private static final String STREAM = "stream";
+    private static final String BATCH = "batch";
+
+    @Parameterized.Parameter public String mode;
+
+    @Parameterized.Parameters(name = "mode = {0}")
+    public static Collection<String> parameters() {
+        return Arrays.asList(STREAM, BATCH);
+    }
+
+    @Rule public ExpectedException expectedException = ExpectedException.none();
+
+    private TableTestUtil util;
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        util =
+                STREAM.equals(mode)
+                        ? streamTestUtil(TableConfig.getDefault())
+                        : batchTestUtil(TableConfig.getDefault());
+        tEnv = util.getTableEnv();
+        tEnv.executeSql(
+                "CREATE TABLE Ticker (\n"
+                        + "  `symbol` VARCHAR,\n"
+                        + "  `price` INT,\n"
+                        + "  `tax` INT,\n"
+                        + "  `proctime` as PROCTIME()\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'bounded' = 'true'\n"
+                        + ")");
+        tEnv.executeSql(
+                "CREATE TABLE MyTable (\n"
+                        + "  a BIGINT,\n"
+                        + "  b INT,\n"
+                        + "  proctime as PROCTIME()\n"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'bounded' = 'true'\n"
+                        + ")");
+    }
+
+    @After
+    public void after() {
+        util.getTableEnv().executeSql("DROP TABLE Ticker");
+        util.getTableEnv().executeSql("DROP TABLE MyTable");
+    }
+
+    /** Function 'MATCH_ROWTIME()' can only be used in MATCH_RECOGNIZE. */
+    @Test(expected = ValidationException.class)
+    public void testMatchRowTimeInSelect() {
+        String sql = "SELECT MATCH_ROWTIME() FROM MyTable";
+        util.verifyExplain(sql);
+    }
+
+    /** Function 'MATCH_PROCTIME()' can only be used in MATCH_RECOGNIZE. */
+    @Test(expected = ValidationException.class)
+    public void testMatchProcTimeInSelect() {
+        String sql = "SELECT MATCH_PROCTIME() FROM MyTable";
+        util.verifyExplain(sql);
+    }
+
+    @Test
+    public void testSortProcessingTimeDesc() {
+        if (STREAM.equals(mode)) {
+            expectedException.expect(TableException.class);
+            expectedException.expectMessage(
+                    "Primary sort order of a streaming table must be ascending on time.");
+            String sqlQuery =
+                    "SELECT *\n"
+                            + "FROM Ticker\n"
+                            + "MATCH_RECOGNIZE (\n"
+                            + "  ORDER BY proctime DESC\n"
+                            + "  MEASURES\n"
+                            + "    A.symbol AS aSymbol\n"
+                            + "  PATTERN (A B)\n"
+                            + "  DEFINE\n"
+                            + "    A AS A.symbol = 'a'\n"
+                            + ") AS T";
+            tEnv.executeSql(sqlQuery);
+        }
+    }
+
+    @Test
+    public void testSortProcessingTimeSecondaryField() {
+        if (STREAM.equals(mode)) {
+            expectedException.expect(TableException.class);
+            expectedException.expectMessage(
+                    "You must specify either rowtime or proctime for order by as the first one.");
+            String sqlQuery =
+                    "SELECT *\n"
+                            + "FROM Ticker\n"
+                            + "MATCH_RECOGNIZE (\n"
+                            + "  ORDER BY price, proctime\n"
+                            + "  MEASURES\n"
+                            + "    A.symbol AS aSymbol\n"
+                            + "  PATTERN (A B)\n"
+                            + "  DEFINE\n"
+                            + "    A AS A.symbol = 'a'\n"
+                            + ") AS T";
+            tEnv.executeSql(sqlQuery);
+        }
+    }
+
+    @Test
+    public void testSortNoOrder() {
+        if (STREAM.equals(mode)) {
+            expectedException.expect(TableException.class);
+            expectedException.expectMessage(
+                    "You must specify either rowtime or proctime for order by.");
+            String sqlQuery =
+                    "SELECT *\n"
+                            + "FROM Ticker\n"
+                            + "MATCH_RECOGNIZE (\n"
+                            + "  MEASURES\n"
+                            + "    A.symbol AS aSymbol\n"
+                            + "  PATTERN (A B)\n"
+                            + "  DEFINE\n"
+                            + "    A AS A.symbol = 'a'\n"
+                            + ") AS T";
+            tEnv.executeSql(sqlQuery);
+        }
+    }
+
+    @Test
+    public void testUpdatesInUpstreamOperatorNotSupported() {
+        if (STREAM.equals(mode)) {
+            expectedException.expect(TableException.class);
+            expectedException.expectMessage(
+                    "Match Recognize doesn't support consuming update changes which is produced by node GroupAggregate(");
+            String sqlQuery =
+                    "SELECT *\n"
+                            + "FROM (SELECT DISTINCT * FROM Ticker)\n"
+                            + "MATCH_RECOGNIZE (\n"
+                            + "  ORDER BY proctime\n"
+                            + "  MEASURES\n"
+                            + "    A.symbol AS aSymbol\n"
+                            + "   ONE ROW PER MATCH"
+                            + "  PATTERN (A B)\n"
+                            + "  DEFINE\n"
+                            + "    A AS A.symbol = 'a'\n"
+                            + ") AS T";
+            tEnv.executeSql(sqlQuery);
+        }
+    }
+
+    @Test
+    public void testAggregatesOnMultiplePatternVariablesNotSupported() {
+        expectedException.expect(ValidationException.class);
+        expectedException.expectMessage("SQL validation failed.");
+        String sqlQuery =
+                "SELECT *\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY proctime\n"
+                        + "  MEASURES\n"
+                        + "    SUM(A.price + B.tax) AS taxedPrice\n"
+                        + "  PATTERN (A B)\n"
+                        + "  DEFINE\n"
+                        + "    A AS A.symbol = 'a'\n"
+                        + ") AS T";
+        tEnv.executeSql(sqlQuery);
+    }
+
+    @Test
+    public void testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs() {
+        expectedException.expect(ValidationException.class);
+        expectedException.expectMessage("Aggregation must be applied to a single pattern variable");
+        util.addTemporarySystemFunction("weightedAvg", new WeightedAvg());
+        String sqlQuery =
+                "SELECT *\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY proctime\n"
+                        + "  MEASURES\n"
+                        + "    weightedAvg(A.price, B.tax) AS weightedAvg\n"
+                        + "  PATTERN (A B)\n"
+                        + "  DEFINE\n"
+                        + "    A AS A.symbol = 'a'\n"
+                        + ") AS T";
+        tEnv.executeSql(sqlQuery);
+    }
+
+    @Test
+    public void testValidatingAmbiguousColumns() {
+        expectedException.expect(ValidationException.class);
+        expectedException.expectMessage("Columns ambiguously defined: {symbol, price}");
+        String sqlQuery =
+                "SELECT *\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  PARTITION BY symbol, price\n"
+                        + "  ORDER BY proctime\n"
+                        + "  MEASURES\n"
+                        + "    A.symbol AS symbol,\n"
+                        + "    A.price AS price\n"
+                        + "  PATTERN (A)\n"
+                        + "  DEFINE\n"
+                        + "    A AS A.symbol = 'a'\n"
+                        + ") AS T";
+        tEnv.executeSql(sqlQuery);
+    }
+
+    // ***************************************************************************************
+    // * Those validations are temporary. We should remove those tests once we support those *
+    // * features.                                                                           *
+    // ***************************************************************************************
+
+    /** Python Function can not be used in MATCH_RECOGNIZE for now. */
+    @Test
+    public void testMatchPythonFunction() {
+        expectedException.expect(TableException.class);
+        expectedException.expectMessage(
+                "Python Function can not be used in MATCH_RECOGNIZE for now.");
+        util.addTemporarySystemFunction("pyFunc", new PythonScalarFunction("pyFunc"));
+        String sql =
+                "SELECT T.aa as ta\n"
+                        + "FROM MyTable\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY proctime\n"
+                        + "  MEASURES\n"
+                        + "    A.a as aa,\n"
+                        + "    pyFunc(1,2) as bb\n"
+                        + "  PATTERN (A B)\n"
+                        + "  DEFINE\n"
+                        + "    A AS a = 1,\n"
+                        + "    B AS b = 'b'\n"
+                        + ") AS T";
+        util.verifyExplain(sql);
+    }
+
+    @Test
+    public void testAllRowsPerMatch() {
+        expectedException.expect(TableException.class);
+        expectedException.expectMessage("All rows per match mode is not supported yet.");
+        String sqlQuery =
+                "SELECT *\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY proctime\n"
+                        + "  MEASURES\n"
+                        + "    A.symbol AS aSymbol\n"
+                        + "  ALL ROWS PER MATCH\n"
+                        + "  PATTERN (A B)\n"
+                        + "  DEFINE\n"
+                        + "    A AS A.symbol = 'a'\n"
+                        + ") AS T";
+        tEnv.executeSql(sqlQuery);
+    }
+
+    @Test
+    public void testGreedyQuantifierAtTheEndIsNotSupported() {
+        expectedException.expect(TableException.class);
+        expectedException.expectMessage(
+                "Greedy quantifiers are not allowed as the last element of a "
+                        + "Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier.");
+        String sqlQuery =
+                "SELECT *\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY proctime\n"
+                        + "  MEASURES\n"
+                        + "    A.symbol AS aSymbol\n"
+                        + "  PATTERN (A B+)\n"
+                        + "  DEFINE\n"
+                        + "    A AS A.symbol = 'a'\n"
+                        + ") AS T";
+        tEnv.executeSql(sqlQuery);
+    }
+
+    @Test
+    public void testPatternsProducingEmptyMatchesAreNotSupported() {
+        expectedException.expect(TableException.class);
+        expectedException.expectMessage(
+                "Patterns that can produce empty matches are not supported. "
+                        + "There must be at least one non-optional state.");
+        String sqlQuery =
+                "SELECT *\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY proctime\n"
+                        + "  MEASURES\n"
+                        + "    A.symbol AS aSymbol\n"
+                        + "  PATTERN (A*)\n"
+                        + "  DEFINE\n"
+                        + "    A AS A.symbol = 'a'\n"
+                        + ") AS T";
+        tEnv.executeSql(sqlQuery);
+    }
+
+    @Test
+    public void testDistinctAggregationsNotSupported() {
+        expectedException.expect(ValidationException.class);
+        expectedException.expectMessage("SQL validation failed.");
+        String sqlQuery =
+                "SELECT *\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY proctime\n"
+                        + "  MEASURES\n"
+                        + "    COUNT(DISTINCT A.price) AS price\n"
+                        + "  PATTERN (A B)\n"
+                        + "  DEFINE\n"
+                        + "    A AS A.symbol = 'a'\n"
+                        + ") AS T";
+        tEnv.executeSql(sqlQuery);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.java
new file mode 100644
index 00000000000..eca0bb0cfcf
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * imitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/** Tests for {@link SqlMatchRecognize}. */
+public class MatchRecognizeTest extends TableTestBase {
+
+    private BatchTableTestUtil util;
+
+    @Before
+    public void before() {
+        util = batchTestUtil(TableConfig.getDefault());
+        util.getTableEnv()
+                .executeSql(
+                        "CREATE TABLE Ticker (\n"
+                                + "  `symbol` VARCHAR,\n"
+                                + "  `price` INT,\n"
+                                + "  `tax` INT,\n"
+                                + "  `ts_ltz` as PROCTIME()\n"
+                                + ") with (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = 'true'\n"
+                                + ")");
+    }
+
+    @After
+    public void after() {
+        util.getTableEnv().executeSql("DROP TABLE Ticker");
+    }
+
+    @Test
+    public void testCascadeMatch() {
+        String sqlQuery =
+                "SELECT *\n"
+                        + "FROM (\n"
+                        + "  SELECT\n"
+                        + "    symbol,\n"
+                        + "    price\n"
+                        + "  FROM Ticker\n"
+                        + "  MATCH_RECOGNIZE (\n"
+                        + "    PARTITION BY symbol\n"
+                        + "     ORDER BY ts_ltz"
+                        + "    MEASURES\n"
+                        + "      A.price as price,\n"
+                        + "      A.tax as tax\n"
+                        + "    ONE ROW PER MATCH\n"
+                        + "    PATTERN (A)\n"
+                        + "    DEFINE\n"
+                        + "      A AS A.price > 0\n"
+                        + "  ) AS T\n"
+                        + "  GROUP BY symbol, price\n"
+                        + ")\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  PARTITION BY symbol\n"
+                        + "  MEASURES\n"
+                        + "    A.price as dPrice\n"
+                        + "  PATTERN (A)\n"
+                        + "  DEFINE\n"
+                        + "    A AS A.symbol = 'a'\n"
+                        + ")";
+        util.verifyExecPlan(sqlQuery);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
index a336a42a2f6..80c8baa66e0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java
@@ -152,4 +152,24 @@ public class BatchOperatorNameTest extends OperatorNameTestBase {
         util.testingTableEnv().registerTableSinkInternal("MySink", sink);
         verifyInsert("insert into MySink select * from MySource");
     }
+
+    @Test
+    public void testMatch() {
+        createSourceWithTimeAttribute();
+        String sql =
+                "SELECT T.aid, T.bid, T.cid\n"
+                        + "     FROM MyTable MATCH_RECOGNIZE (\n"
+                        + "             ORDER BY proctime\n"
+                        + "             MEASURES\n"
+                        + "             `A\"`.a AS aid,\n"
+                        + "             \u006C.a AS bid,\n"
+                        + "             C.a AS cid\n"
+                        + "             PATTERN (`A\"` \u006C C)\n"
+                        + "             DEFINE\n"
+                        + "                 `A\"` AS a = 1,\n"
+                        + "                 \u006C AS b = 2,\n"
+                        + "                 C AS c = 'c'\n"
+                        + "     ) AS T";
+        verifyQuery(sql);
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
new file mode 100644
index 00000000000..31860779478
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
@@ -0,0 +1,645 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * imitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.apache.flink.api.common.typeinfo.Types.DOUBLE;
+import static org.apache.flink.api.common.typeinfo.Types.INT;
+import static org.apache.flink.api.common.typeinfo.Types.LONG;
+import static org.apache.flink.api.common.typeinfo.Types.ROW_NAMED;
+import static org.apache.flink.api.common.typeinfo.Types.STRING;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** IT Case for testing {@link SqlMatchRecognize}. */
+public class MatchRecognizeITCase {
+
+    private StreamExecutionEnvironment env;
+    private StreamTableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode());
+    }
+
+    @Test
+    public void testSimplePattern() {
+        tEnv.createTemporaryView(
+                "MyTable",
+                tEnv.fromDataStream(
+                        env.fromElements(
+                                        Row.of(1, "a"),
+                                        Row.of(2, "z"),
+                                        Row.of(3, "b"),
+                                        Row.of(4, "c"),
+                                        Row.of(5, "d"),
+                                        Row.of(6, "a"),
+                                        Row.of(7, "b"),
+                                        Row.of(8, "c"),
+                                        Row.of(9, "h"))
+                                .returns(ROW_NAMED(new String[] {"id", "name"}, INT, STRING)),
+                        Schema.newBuilder()
+                                .column("id", DataTypes.INT())
+                                .column("name", DataTypes.STRING())
+                                .columnByExpression("proctime", "PROCTIME()")
+                                .build()));
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT T.aid, T.bid, T.cid\n"
+                                + "FROM MyTable\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY proctime\n"
+                                + "  MEASURES\n"
+                                + "    `A\"`.id AS aid,\n"
+                                + "    \u006C.id AS bid,\n"
+                                + "    C.id AS cid\n"
+                                + "  PATTERN (`A\"` \u006C C)\n"
+                                + "  DEFINE\n"
+                                + "    `A\"` AS name = 'a',\n"
+                                + "    \u006C AS name = 'b',\n"
+                                + "    C AS name = 'c'\n"
+                                + ") AS T");
+        assertEquals(
+                Collections.singletonList(Row.of(6, 7, 8)),
+                CollectionUtil.iteratorToList(tableResult.collect()));
+    }
+
+    @Test
+    public void testSimplePatternWithNulls() {
+        tEnv.createTemporaryView(
+                "MyTable",
+                tEnv.fromDataStream(
+                        env.fromElements(
+                                        Row.of(1, "a", null),
+                                        Row.of(2, "b", null),
+                                        Row.of(3, "c", null),
+                                        Row.of(4, "d", null),
+                                        Row.of(5, null, null),
+                                        Row.of(6, "a", null),
+                                        Row.of(7, "b", null),
+                                        Row.of(8, "c", null),
+                                        Row.of(9, null, null))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"id", "name", "nullField"},
+                                                INT,
+                                                STRING,
+                                                STRING)),
+                        Schema.newBuilder()
+                                .column("id", DataTypes.INT())
+                                .column("name", DataTypes.STRING())
+                                .column("nullField", DataTypes.STRING())
+                                .columnByExpression("proctime", "PROCTIME()")
+                                .build()));
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT T.aid, T.bNull, T.cid, T.aNull\n"
+                                + "FROM MyTable\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY proctime\n"
+                                + "  MEASURES\n"
+                                + "    A.id AS aid,\n"
+                                + "    A.nullField AS aNull,\n"
+                                + "    LAST(B.nullField) AS bNull,\n"
+                                + "    C.id AS cid\n"
+                                + "  PATTERN (A B C)\n"
+                                + "  DEFINE\n"
+                                + "    A AS name = 'a' AND nullField IS NULL,\n"
+                                + "    B AS name = 'b' AND LAST(A.nullField) IS NULL,\n"
+                                + "    C AS name = 'c'\n"
+                                + ") AS T");
+        assertEquals(
+                Arrays.asList(Row.of(1, null, 3, null), Row.of(6, null, 8, null)),
+                CollectionUtil.iteratorToList(tableResult.collect()));
+    }
+
+    @Test
+    public void testCodeSplitsAreProperlyGenerated() {
+        tEnv.getConfig().setMaxGeneratedCodeLength(1);
+        tEnv.createTemporaryView(
+                "MyTable",
+                tEnv.fromDataStream(
+                        env.fromElements(
+                                        Row.of(1, "a", "key1", "second_key3"),
+                                        Row.of(2, "b", "key1", "second_key3"),
+                                        Row.of(3, "c", "key1", "second_key3"),
+                                        Row.of(4, "d", "key", "second_key"),
+                                        Row.of(5, "e", "key", "second_key"),
+                                        Row.of(6, "a", "key2", "second_key4"),
+                                        Row.of(7, "b", "key2", "second_key4"),
+                                        Row.of(8, "c", "key2", "second_key4"),
+                                        Row.of(9, "f", "key", "second_key"))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"id", "name", "key1", "key2"},
+                                                INT,
+                                                STRING,
+                                                STRING,
+                                                STRING)),
+                        Schema.newBuilder()
+                                .column("id", DataTypes.INT())
+                                .column("name", DataTypes.STRING())
+                                .column("key1", DataTypes.STRING())
+                                .column("key2", DataTypes.STRING())
+                                .columnByExpression("proctime", "PROCTIME()")
+                                .build()));
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT *\n"
+                                + "FROM MyTable\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  PARTITION BY key1, key2\n"
+                                + "  ORDER BY proctime\n"
+                                + "  MEASURES\n"
+                                + "    A.id AS aid,\n"
+                                + "    A.key1 AS akey1,\n"
+                                + "    LAST(B.id) AS bid,\n"
+                                + "    C.id AS cid,\n"
+                                + "    C.key2 AS ckey2\n"
+                                + "  PATTERN (A B C)\n"
+                                + "  DEFINE\n"
+                                + "    A AS name = 'a' AND key1 LIKE '%key%' AND id > 0,\n"
+                                + "    B AS name = 'b' AND LAST(A.name, 2) IS NULL,\n"
+                                + "    C AS name = 'c' AND LAST(A.name) = 'a'\n"
+                                + ") AS T");
+        List<Row> actual = CollectionUtil.iteratorToList(tableResult.collect());
+        actual.sort(Comparator.comparing(o -> String.valueOf(o.getField(0))));
+        assertEquals(
+                Arrays.asList(
+                        Row.of("key1", "second_key3", 1, "key1", 2, 3, "second_key3"),
+                        Row.of("key2", "second_key4", 6, "key2", 7, 8, "second_key4")),
+                actual);
+    }
+
+    @Test
+    public void testLogicalOffsets() {
+        tEnv.createTemporaryView(
+                "Ticker",
+                tEnv.fromDataStream(
+                        env.fromElements(
+                                        Row.of("ACME", 1L, 19, 1),
+                                        Row.of("ACME", 2L, 17, 2),
+                                        Row.of("ACME", 3L, 13, 3),
+                                        Row.of("ACME", 4L, 20, 4),
+                                        Row.of("ACME", 5L, 20, 5),
+                                        Row.of("ACME", 6L, 26, 6),
+                                        Row.of("ACME", 7L, 20, 7),
+                                        Row.of("ACME", 8L, 25, 8))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"symbol", "tstamp", "price", "tax"},
+                                                STRING,
+                                                LONG,
+                                                INT,
+                                                INT)),
+                        Schema.newBuilder()
+                                .column("symbol", DataTypes.STRING())
+                                .column("tstamp", DataTypes.BIGINT())
+                                .column("price", DataTypes.INT())
+                                .column("tax", DataTypes.INT())
+                                .columnByExpression("proctime", "PROCTIME()")
+                                .build()));
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT *\n"
+                                + "FROM Ticker\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY proctime\n"
+                                + "  MEASURES\n"
+                                + "    FIRST(DOWN.tstamp) AS start_tstamp,\n"
+                                + "    LAST(DOWN.tstamp) AS bottom_tstamp,\n"
+                                + "    UP.tstamp AS end_tstamp,\n"
+                                + "    FIRST(DOWN.price + DOWN.tax + 1) AS bottom_total,\n"
+                                + "    UP.price + UP.tax AS end_total\n"
+                                + "  ONE ROW PER MATCH\n"
+                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                                + "  PATTERN (DOWN{2,} UP)\n"
+                                + "  DEFINE\n"
+                                + "    DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n"
+                                + "    UP AS price < FIRST(DOWN.price)\n"
+                                + ") AS T");
+        assertEquals(
+                Collections.singletonList(Row.of(6L, 7L, 8L, 33, 33)),
+                CollectionUtil.iteratorToList(tableResult.collect()));
+    }
+
+    @Test
+    public void testLogicalOffsetsWithStarVariable() {
+        tEnv.createTemporaryView(
+                "Ticker",
+                tEnv.fromDataStream(
+                        env.fromElements(
+                                        Row.of(1, "ACME", 1L, 20),
+                                        Row.of(2, "ACME", 2L, 19),
+                                        Row.of(3, "ACME", 3L, 18),
+                                        Row.of(4, "ACME", 4L, 17),
+                                        Row.of(5, "ACME", 5L, 16),
+                                        Row.of(6, "ACME", 6L, 15),
+                                        Row.of(7, "ACME", 7L, 14),
+                                        Row.of(8, "ACME", 8L, 20))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"id", "symbol", "tstamp", "price"},
+                                                INT,
+                                                STRING,
+                                                LONG,
+                                                INT)),
+                        Schema.newBuilder()
+                                .column("id", DataTypes.INT())
+                                .column("symbol", DataTypes.STRING())
+                                .column("tstamp", DataTypes.BIGINT())
+                                .column("price", DataTypes.INT())
+                                .columnByExpression("proctime", "PROCTIME()")
+                                .build()));
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT *\n"
+                                + "FROM Ticker\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY proctime\n"
+                                + "  MEASURES\n"
+                                + "    FIRST(id, 0) as id0,\n"
+                                + "    FIRST(id, 1) as id1,\n"
+                                + "    FIRST(id, 2) as id2,\n"
+                                + "    FIRST(id, 3) as id3,\n"
+                                + "    FIRST(id, 4) as id4,\n"
+                                + "    FIRST(id, 5) as id5,\n"
+                                + "    FIRST(id, 6) as id6,\n"
+                                + "    FIRST(id, 7) as id7,\n"
+                                + "    LAST(id, 0) as id8,\n"
+                                + "    LAST(id, 1) as id9,\n"
+                                + "    LAST(id, 2) as id10,\n"
+                                + "    LAST(id, 3) as id11,\n"
+                                + "    LAST(id, 4) as id12,\n"
+                                + "    LAST(id, 5) as id13,\n"
+                                + "    LAST(id, 6) as id14,\n"
+                                + "    LAST(id, 7) as id15\n"
+                                + "  ONE ROW PER MATCH\n"
+                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                                + "  PATTERN (`DOWN\"`{2,} UP)\n"
+                                + "  DEFINE\n"
+                                + "    `DOWN\"` AS price < LAST(price, 1) OR LAST(price, 1) IS NULL,\n"
+                                + "    UP AS price = FIRST(price) AND price > FIRST(price, 3) AND price = LAST(price, 7)\n"
+                                + ") AS T");
+        assertEquals(
+                Collections.singletonList(Row.of(1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 3, 2, 1)),
+                CollectionUtil.iteratorToList(tableResult.collect()));
+    }
+
+    @Test
+    public void testLogicalOffsetOutsideOfRangeInMeasures() {
+        tEnv.createTemporaryView(
+                "Ticker",
+                tEnv.fromDataStream(
+                        env.fromElements(
+                                        Row.of("ACME", 1L, 19, 1),
+                                        Row.of("ACME", 2L, 17, 2),
+                                        Row.of("ACME", 3L, 13, 3),
+                                        Row.of("ACME", 4L, 20, 4))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"symbol", "tstamp", "price", "tax"},
+                                                STRING,
+                                                LONG,
+                                                INT,
+                                                INT)),
+                        Schema.newBuilder()
+                                .column("symbol", DataTypes.STRING())
+                                .column("tstamp", DataTypes.BIGINT())
+                                .column("price", DataTypes.INT())
+                                .column("tax", DataTypes.INT())
+                                .columnByExpression("proctime", "PROCTIME()")
+                                .build()));
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT *\n"
+                                + "FROM Ticker\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY proctime\n"
+                                + "  MEASURES\n"
+                                + "    FIRST(DOWN.price) as first,\n"
+                                + "    LAST(DOWN.price) as last,\n"
+                                + "    FIRST(DOWN.price, 5) as nullPrice\n"
+                                + "  ONE ROW PER MATCH\n"
+                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                                + "  PATTERN (DOWN{2,} UP)\n"
+                                + "  DEFINE\n"
+                                + "    DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n"
+                                + "    UP AS price > LAST(DOWN.price)\n"
+                                + ") AS T");
+        assertEquals(
+                Collections.singletonList(Row.of(19, 13, null)),
+                CollectionUtil.iteratorToList(tableResult.collect()));
+    }
+
+    /**
+     * This query checks:
+     *
+     * <p>1. count(D.price) produces 0, because no rows matched to D 2. sum(D.price) produces null,
+     * because no rows matched to D 3. aggregates that take multiple parameters work 4. aggregates
+     * with expressions work
+     */
+    @Test
+    public void testAggregates() {
+        tEnv.getConfig().setMaxGeneratedCodeLength(1);
+        tEnv.createTemporaryView(
+                "MyTable",
+                tEnv.fromDataStream(
+                        env.fromElements(
+                                        Row.of(1, "a", 1, 0.8, 1),
+                                        Row.of(2, "z", 2, 0.8, 3),
+                                        Row.of(3, "b", 1, 0.8, 2),
+                                        Row.of(4, "c", 1, 0.8, 5),
+                                        Row.of(5, "d", 4, 0.1, 5),
+                                        Row.of(6, "a", 2, 1.5, 2),
+                                        Row.of(7, "b", 2, 0.8, 3),
+                                        Row.of(8, "c", 1, 0.8, 2),
+                                        Row.of(9, "h", 4, 0.8, 3),
+                                        Row.of(10, "h", 4, 0.8, 3),
+                                        Row.of(11, "h", 2, 0.8, 3),
+                                        Row.of(12, "h", 2, 0.8, 3))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {
+                                                    "id", "name", "price", "rate", "weight"
+                                                },
+                                                INT,
+                                                STRING,
+                                                INT,
+                                                DOUBLE,
+                                                INT)),
+                        Schema.newBuilder()
+                                .column("id", DataTypes.INT())
+                                .column("name", DataTypes.STRING())
+                                .column("price", DataTypes.INT())
+                                .column("rate", DataTypes.DOUBLE())
+                                .column("weight", DataTypes.INT())
+                                .columnByExpression("proctime", "PROCTIME()")
+                                .build()));
+        tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT *\n"
+                                + "FROM MyTable\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY proctime\n"
+                                + "  MEASURES\n"
+                                + "    FIRST(id) as startId,\n"
+                                + "    SUM(A.price) AS sumA,\n"
+                                + "    COUNT(D.price) AS countD,\n"
+                                + "    SUM(D.price) as sumD,\n"
+                                + "    weightedAvg(price, weight) as wAvg,\n"
+                                + "    AVG(B.price) AS avgB,\n"
+                                + "    SUM(B.price * B.rate) as sumExprB,\n"
+                                + "    LAST(id) as endId\n"
+                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                                + "  PATTERN (A+ B+ C D? E)\n"
+                                + "  DEFINE\n"
+                                + "    A AS SUM(A.price) < 6,\n"
+                                + "    B AS SUM(B.price * B.rate) < SUM(A.price) AND\n"
+                                + "      SUM(B.price * B.rate) > 0.2 AND\n"
+                                + "      SUM(B.price) >= 1 AND\n"
+                                + "      AVG(B.price) >= 1 AND\n"
+                                + "      weightedAvg(price, weight) > 1\n"
+                                + ") AS T");
+        assertEquals(
+                Arrays.asList(
+                        Row.of(1, 5, 0L, null, 2L, 3, 3.4D, 8),
+                        Row.of(9, 4, 0L, null, 3L, 4, 3.2D, 12)),
+                CollectionUtil.iteratorToList(tableResult.collect()));
+    }
+
+    @Test
+    public void testAggregatesWithNullInputs() {
+        tEnv.getConfig().setMaxGeneratedCodeLength(1);
+        tEnv.createTemporaryView(
+                "MyTable",
+                tEnv.fromDataStream(
+                        env.fromElements(
+                                        Row.of(1, "a", 10),
+                                        Row.of(2, "z", 10),
+                                        Row.of(3, "b", null),
+                                        Row.of(4, "c", null),
+                                        Row.of(5, "d", 3),
+                                        Row.of(6, "c", 3),
+                                        Row.of(7, "c", 3),
+                                        Row.of(8, "c", 3),
+                                        Row.of(9, "c", 2))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"id", "name", "price"},
+                                                INT,
+                                                STRING,
+                                                INT)),
+                        Schema.newBuilder()
+                                .column("id", DataTypes.INT())
+                                .column("name", DataTypes.STRING())
+                                .column("price", DataTypes.INT())
+                                .columnByExpression("proctime", "PROCTIME()")
+                                .build()));
+        tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT *\n"
+                                + "FROM MyTable\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY proctime\n"
+                                + "  MEASURES\n"
+                                + "    SUM(A.price) as sumA,\n"
+                                + "    COUNT(A.id) as countAId,\n"
+                                + "    COUNT(A.price) as countAPrice,\n"
+                                + "    COUNT(*) as countAll,\n"
+                                + "    COUNT(price) as countAllPrice,\n"
+                                + "    LAST(id) as endId\n"
+                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                                + "  PATTERN (A+ C)\n"
+                                + "  DEFINE\n"
+                                + "    A AS SUM(A.price) < 30,\n"
+                                + "    C AS C.name = 'c'\n"
+                                + ") AS T");
+        assertEquals(
+                Collections.singletonList(Row.of(29, 7L, 5L, 8L, 6L, 8)),
+                CollectionUtil.iteratorToList(tableResult.collect()));
+    }
+
+    @Test
+    public void testAccessingCurrentTime() {
+        tEnv.createTemporaryView(
+                "MyTable",
+                tEnv.fromDataStream(
+                        env.fromElements(Row.of(1, "a"))
+                                .returns(ROW_NAMED(new String[] {"id", "name"}, INT, STRING)),
+                        Schema.newBuilder()
+                                .column("id", DataTypes.INT())
+                                .column("name", DataTypes.STRING())
+                                .columnByExpression("proctime", "PROCTIME()")
+                                .build()));
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT T.aid\n"
+                                + "FROM MyTable\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY proctime\n"
+                                + "  MEASURES\n"
+                                + "    A.id AS aid,\n"
+                                + "    A.proctime AS aProctime,\n"
+                                + "    LAST(A.proctime + INTERVAL '1' second) as calculatedField\n"
+                                + "  PATTERN (A)\n"
+                                + "  DEFINE\n"
+                                + "    A AS proctime >= (CURRENT_TIMESTAMP - INTERVAL '1' day)\n"
+                                + ") AS T");
+        assertEquals(
+                Collections.singletonList(Row.of(1)),
+                CollectionUtil.iteratorToList(tableResult.collect()));
+    }
+
+    @Test
+    public void testUserDefinedFunctions() {
+        tEnv.getConfig().setMaxGeneratedCodeLength(1);
+        tEnv.createTemporaryView(
+                "MyTable",
+                tEnv.fromDataStream(
+                        env.fromElements(
+                                        Row.of(1, "a", 1),
+                                        Row.of(2, "a", 1),
+                                        Row.of(3, "a", 1),
+                                        Row.of(4, "a", 1),
+                                        Row.of(5, "a", 1),
+                                        Row.of(6, "b", 1),
+                                        Row.of(7, "a", 1),
+                                        Row.of(8, "a", 1),
+                                        Row.of(9, "f", 1))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"id", "name", "price"},
+                                                INT,
+                                                STRING,
+                                                INT)),
+                        Schema.newBuilder()
+                                .column("id", DataTypes.INT())
+                                .column("name", DataTypes.STRING())
+                                .column("price", DataTypes.INT())
+                                .columnByExpression("proctime", "PROCTIME()")
+                                .build()));
+        tEnv.createTemporarySystemFunction("prefix", new PrefixingScalarFunc());
+        tEnv.createTemporarySystemFunction("countFrom", new RichAggFunc());
+        String prefix = "PREF";
+        int startFrom = 4;
+        Configuration jobParameters = new Configuration();
+        jobParameters.setString("prefix", prefix);
+        jobParameters.setString("start", Integer.toString(startFrom));
+        env.getConfig().setGlobalJobParameters(jobParameters);
+        TableResult tableResult =
+                tEnv.executeSql(
+                        String.format(
+                                "SELECT *\n"
+                                        + "FROM MyTable\n"
+                                        + "MATCH_RECOGNIZE (\n"
+                                        + "  ORDER BY proctime\n"
+                                        + "  MEASURES\n"
+                                        + "    FIRST(id) as firstId,\n"
+                                        + "    prefix(A.name) as prefixedNameA,\n"
+                                        + "    countFrom(A.price) as countFromA,\n"
+                                        + "    LAST(id) as lastId\n"
+                                        + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                                        + "  PATTERN (A+ C)\n"
+                                        + "  DEFINE\n"
+                                        + "    A AS prefix(A.name) = '%s:a' AND countFrom(A.price) <= %d\n"
+                                        + ") AS T",
+                                prefix, 4 + 4));
+        assertEquals(
+                Arrays.asList(Row.of(1, "PREF:a", 8, 5), Row.of(7, "PREF:a", 6, 9)),
+                CollectionUtil.iteratorToList(tableResult.collect()));
+    }
+
+    /** Test prefixing function.. */
+    public static class PrefixingScalarFunc extends ScalarFunction {
+
+        private String prefix = "ERROR_VALUE";
+
+        @Override
+        public void open(FunctionContext context) throws Exception {
+            prefix = context.getJobParameter("prefix", "");
+        }
+
+        public String eval(String value) {
+            return String.format("%s:%s", prefix, value);
+        }
+    }
+
+    /** Test count accumulator. */
+    public static class CountAcc {
+        public Integer count;
+
+        public CountAcc(Integer count) {
+            this.count = count;
+        }
+    }
+
+    /** Test rich aggregate function. */
+    public static class RichAggFunc extends AggregateFunction<Integer, CountAcc> {
+
+        private Integer start = 0;
+
+        @Override
+        public void open(FunctionContext context) throws Exception {
+            start = Integer.valueOf(context.getJobParameter("start", "0"));
+        }
+
+        @Override
+        public void close() throws Exception {
+            start = 0;
+        }
+
+        @Override
+        public CountAcc createAccumulator() {
+            return new CountAcc(start);
+        }
+
+        @Override
+        public Integer getValue(CountAcc accumulator) {
+            return accumulator.count;
+        }
+
+        public void accumulate(CountAcc countAcc, Integer value) {
+            countAcc.count += value;
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.xml
new file mode 100644
index 00000000000..ba1da9ec987
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testCascadeMatch">
+    <Resource name="sql">
+      <![CDATA[SELECT *
+FROM (
+  SELECT
+    symbol,
+    price
+  FROM Ticker
+  MATCH_RECOGNIZE (
+    PARTITION BY symbol
+     ORDER BY ts_ltz    MEASURES
+      A.price as price,
+      A.tax as tax
+    ONE ROW PER MATCH
+    PATTERN (A)
+    DEFINE
+      A AS A.price > 0
+  ) AS T
+  GROUP BY symbol, price
+)
+MATCH_RECOGNIZE (
+  PARTITION BY symbol
+  MEASURES
+    A.price as dPrice
+  PATTERN (A)
+  DEFINE
+    A AS A.symbol = 'a'
+)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(symbol=[$0], dPrice=[$1])
++- LogicalMatch(partition=[[0]], order=[[]], outputFields=[[symbol, dPrice]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[_UTF-16LE'A'], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(PREV(A.$0, 0), _UTF-16LE'a')]], inputFields=[[symbol, price]])
+   +- LogicalAggregate(group=[{0, 1}])
+      +- LogicalProject(symbol=[$0], price=[$1])
+         +- LogicalMatch(partition=[[0]], order=[[3 ASC-nulls-first]], outputFields=[[symbol, price, tax]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[_UTF-16LE'A'], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$1, 0), 0)]], inputFields=[[symbol, price, tax, ts_ltz]])
+            +- LogicalProject(symbol=[$0], price=[$1], tax=[$2], ts_ltz=[PROCTIME()])
+               +- LogicalTableScan(table=[[default_catalog, default_database, Ticker]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Match(partitionBy=[symbol], measures=[FINAL(A.price) AS dPrice], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[_UTF-16LE'A'], define=[{A==(PREV(A.$0, 0), _UTF-16LE'a')}])
++- Exchange(distribution=[hash[symbol]])
+   +- HashAggregate(isMerge=[true], groupBy=[symbol, price], select=[symbol, price])
+      +- Exchange(distribution=[hash[symbol, price]])
+         +- LocalHashAggregate(groupBy=[symbol, price], select=[symbol, price])
+            +- Calc(select=[symbol, price])
+               +- Match(partitionBy=[symbol], orderBy=[ts_ltz ASC], measures=[FINAL(A.price) AS price, FINAL(A.tax) AS tax], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[_UTF-16LE'A'], define=[{A=>(PREV(A.$1, 0), 0)}])
+                  +- Calc(select=[symbol, price, tax, PROCTIME() AS ts_ltz])
+                     +- Exchange(distribution=[hash[symbol]])
+                        +- TableSourceScan(table=[[default_catalog, default_database, Ticker]], fields=[symbol, price, tax])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
index 9e8ac59965d..5552fb5ba70 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml
@@ -2093,6 +2093,114 @@ Calc(select=[b, w$end AS window_end, EXPR$2])
       "side" : "second"
     } ]
   } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMatch[isNameSimplifyEnabled=false]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(aid=[$0], bid=[$1], cid=[$2])
++- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Optimized Execution Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+    "pact" : "Data Source",
+    "contents" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+    "pact" : "Operator",
+    "contents" : "Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "GLOBAL",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+    "pact" : "Operator",
+    "contents" : "Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMatch[isNameSimplifyEnabled=true]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalProject(aid=[$0], bid=[$1], cid=[$2])
++- LogicalMatch(partition=[[]], order=[[5 ASC-nulls-first]], outputFields=[[aid, bid, cid]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[=(LAST(*.$0, 0), 1), =(LAST(*.$1, 0), 2), =(LAST(*.$2, 0), _UTF-16LE'c')]], inputFields=[[a, b, c, d, rowtime, proctime]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Optimized Execution Plan ==
+Match(orderBy=[proctime ASC], measures=[FINAL(A".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])
++- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: MyTable[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "GLOBAL",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Match[]",
+    "pact" : "Operator",
+    "contents" : "[]:Match(orderBy=[proctime ASC], measures=[FINAL(A\".a) AS aid, FINAL(l.a) AS bid, FINAL(C.a) AS cid], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[((_UTF-16LE'A\"', _UTF-16LE'l'), _UTF-16LE'C')], define=[{A\"==(LAST(*.$0, 0), 1), l==(LAST(*.$1, 0), 2), C==(LAST(*.$2, 0), _UTF-16LE'c')}])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
 }]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala
deleted file mode 100644
index 6897fb8735c..00000000000
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.stream.sql.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
-import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg
-import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction
-import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.ToMillis
-import org.apache.flink.table.planner.utils.TableTestBase
-import org.apache.flink.types.Row
-
-import org.junit.Test
-
-import java.sql.Timestamp
-
-class MatchRecognizeValidationTest extends TableTestBase {
-
-  private val streamUtil = scalaStreamTestUtil()
-  streamUtil.addDataStream[(Int, String, Timestamp)](
-    "MyTable",
-    'a,
-    'b,
-    'rowtime.rowtime,
-    'proctime.proctime)
-  streamUtil.addDataStream[(String, Long, Int, Int)](
-    "Ticker",
-    'symbol,
-    'tstamp,
-    'price,
-    'tax,
-    'proctime.proctime)
-  streamUtil.addFunction("ToMillis", new ToMillis)
-
-  /** Function 'MATCH_ROWTIME()' can only be used in MATCH_RECOGNIZE * */
-  @Test(expected = classOf[ValidationException])
-  def testMatchRowtimeInSelect(): Unit = {
-    val sql = "SELECT MATCH_ROWTIME() FROM MyTable"
-    streamUtil.verifyExplain(sql)
-  }
-
-  /** Function 'MATCH_PROCTIME()' can only be used in MATCH_RECOGNIZE * */
-  @Test(expected = classOf[ValidationException])
-  def testMatchProctimeInSelect(): Unit = {
-    val sql = "SELECT MATCH_PROCTIME() FROM MyTable"
-    streamUtil.verifyExplain(sql)
-  }
-
-  @Test
-  def testSortProcessingTimeDesc(): Unit = {
-    thrown.expectMessage("Primary sort order of a streaming table must be ascending on time.")
-    thrown.expect(classOf[TableException])
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  ORDER BY proctime DESC
-         |  MEASURES
-         |    A.symbol AS aSymbol
-         |  PATTERN (A B)
-         |  DEFINE
-         |    A AS symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-
-  @Test
-  def testSortProcessingTimeSecondaryField(): Unit = {
-    thrown.expectMessage(
-      "You must specify either rowtime or proctime for order by as " +
-        "the first one.")
-    thrown.expect(classOf[TableException])
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  ORDER BY price, proctime
-         |  MEASURES
-         |    A.symbol AS aSymbol
-         |  PATTERN (A B)
-         |  DEFINE
-         |    A AS symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-
-  @Test
-  def testSortNoOrder(): Unit = {
-    thrown.expectMessage("You must specify either rowtime or proctime for order by.")
-    thrown.expect(classOf[TableException])
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  MEASURES
-         |    A.symbol AS aSymbol
-         |  PATTERN (A B)
-         |  DEFINE
-         |    A AS symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-
-  @Test
-  def testUpdatesInUpstreamOperatorNotSupported(): Unit = {
-    thrown.expectMessage(
-      "Match Recognize doesn't support consuming update changes " +
-        "which is produced by node GroupAggregate(")
-    thrown.expect(classOf[TableException])
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM (SELECT DISTINCT * FROM Ticker)
-         |MATCH_RECOGNIZE (
-         |  ORDER BY proctime
-         |  MEASURES
-         |    A.symbol AS aSymbol
-         |  ONE ROW PER MATCH
-         |  PATTERN (A B)
-         |  DEFINE
-         |    A AS symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtil.tableEnv.sqlQuery(sqlQuery).toRetractStream[Row]
-  }
-
-  @Test
-  def testAggregatesOnMultiplePatternVariablesNotSupported(): Unit = {
-    thrown.expect(classOf[ValidationException])
-    thrown.expectMessage("SQL validation failed.")
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  ORDER BY proctime
-         |  MEASURES
-         |    SUM(A.price + B.tax) AS taxedPrice
-         |  PATTERN (A B)
-         |  DEFINE
-         |    A AS A.symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-
-  @Test
-  def testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs(): Unit = {
-    thrown.expect(classOf[ValidationException])
-    thrown.expectMessage("Aggregation must be applied to a single pattern variable")
-
-    streamUtil.addFunction("weightedAvg", new WeightedAvg)
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  ORDER BY proctime
-         |  MEASURES
-         |    weightedAvg(A.price, B.tax) AS weightedAvg
-         |  PATTERN (A B)
-         |  DEFINE
-         |    A AS A.symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-
-  @Test
-  def testValidatingAmbiguousColumns(): Unit = {
-    thrown.expectMessage("Columns ambiguously defined: {symbol, price}")
-    thrown.expect(classOf[ValidationException])
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  PARTITION BY symbol, price
-         |  ORDER BY proctime
-         |  MEASURES
-         |    A.symbol AS symbol,
-         |    A.price AS price
-         |  PATTERN (A)
-         |  DEFINE
-         |    A AS symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-
-  // ***************************************************************************************
-  // * Those validations are temporary. We should remove those tests once we support those *
-  // * features.                                                                           *
-  // ***************************************************************************************
-
-  /** Python Function can not be used in MATCH_RECOGNIZE for now * */
-  @Test
-  def testMatchPythonFunction() = {
-    thrown.expectMessage("Python Function can not be used in MATCH_RECOGNIZE for now.")
-    thrown.expect(classOf[TableException])
-
-    streamUtil.addFunction("pyFunc", new PythonScalarFunction("pyFunc"))
-    val sql =
-      """SELECT T.aa as ta
-        |FROM MyTable
-        |MATCH_RECOGNIZE (
-        |  ORDER BY proctime
-        |  MEASURES
-        |    A.a as aa,
-        |    pyFunc(1,2) as bb
-        |  PATTERN (A B)
-        |  DEFINE
-        |    A AS a = 1,
-        |    B AS b = 'b'
-        |) AS T""".stripMargin
-    streamUtil.verifyExplain(sql)
-  }
-
-  @Test
-  def testAllRowsPerMatch(): Unit = {
-    thrown.expectMessage("All rows per match mode is not supported yet.")
-    thrown.expect(classOf[TableException])
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  ORDER BY proctime
-         |  MEASURES
-         |    A.symbol AS aSymbol
-         |  ALL ROWS PER MATCH
-         |  PATTERN (A B)
-         |  DEFINE
-         |    A AS symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-
-  @Test
-  def testGreedyQuantifierAtTheEndIsNotSupported(): Unit = {
-    thrown.expectMessage(
-      "Greedy quantifiers are not allowed as the last element of a " +
-        "Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier.")
-    thrown.expect(classOf[TableException])
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  ORDER BY proctime
-         |  MEASURES
-         |    A.symbol AS aSymbol
-         |  PATTERN (A B+)
-         |  DEFINE
-         |    A AS symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-
-  @Test
-  def testPatternsProducingEmptyMatchesAreNotSupported(): Unit = {
-    thrown.expectMessage(
-      "Patterns that can produce empty matches are not supported. " +
-        "There must be at least one non-optional state.")
-    thrown.expect(classOf[TableException])
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  ORDER BY proctime
-         |  MEASURES
-         |    A.symbol AS aSymbol
-         |  PATTERN (A*)
-         |  DEFINE
-         |    A AS symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-
-  @Test
-  def testDistinctAggregationsNotSupported(): Unit = {
-    thrown.expect(classOf[ValidationException])
-
-    val sqlQuery =
-      s"""
-         |SELECT *
-         |FROM Ticker
-         |MATCH_RECOGNIZE (
-         |  ORDER BY proctime
-         |  MEASURES
-         |    COUNT(DISTINCT A.price) AS price
-         |  PATTERN (A B)
-         |  DEFINE
-         |    A AS A.symbol = 'a'
-         |) AS T
-         |""".stripMargin
-
-    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
-  }
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
index e62f9fb6b5b..ac4db52febc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.data.RowData
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl
 import org.apache.flink.table.planner.delegation.PlannerBase
-import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch
 import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDataStreamScan, StreamPhysicalMatch}
 import org.apache.flink.table.planner.plan.utils.MatchUtil
 import org.apache.flink.table.planner.utils.TableTestUtil
@@ -105,9 +105,9 @@ abstract class PatternTranslatorTestBase extends TestLogger {
     }
 
     val dataMatch = optimized.asInstanceOf[StreamPhysicalMatch]
-    val p = StreamExecMatch
+    val p = CommonExecMatch
       .translatePattern(
-        MatchUtil.createMatchSpec(dataMatch.logicalMatch),
+        MatchUtil.createMatchSpec(dataMatch.getLogicalMatch),
         new Configuration,
         Thread.currentThread().getContextClassLoader,
         context._1,