You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/22 18:33:52 UTC

[GitHub] dawidwys closed pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE

dawidwys closed pull request #6815:  [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
index 5554151ccbd..cc243096f1c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
@@ -24,6 +24,7 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,6 +64,18 @@ protected EventId getPruningId(Collection<Map<String, List<EventId>>> match) {
 			} else {
 				pruningId = max(pruningId, pruningPattern.get(getIndex(pruningPattern.size())));
 			}
+
+			if (shouldThrowException) {
+				EventId startEvent = resultMap.values()
+					.stream()
+					.flatMap(Collection::stream)
+					.min(EventId::compareTo)
+					.orElseThrow(() -> new IllegalStateException("Cannot prune based on empty match"));
+
+				if (pruningId != null && pruningId.equals(startEvent)) {
+					throw new FlinkRuntimeException("Could not skip to first element of a match.");
+				}
+			}
 		}
 
 		return pruningId;
@@ -86,4 +99,22 @@ protected EventId getPruningId(Collection<Map<String, List<EventId>>> match) {
 	 * {@link NoSkipStrategy} will be used
 	 */
 	public abstract SkipToElementStrategy throwExceptionOnMiss();
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		SkipToElementStrategy that = (SkipToElementStrategy) o;
+		return shouldThrowException == that.shouldThrowException &&
+			Objects.equals(patternName, that.patternName);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(patternName, shouldThrowException);
+	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index bdb28977035..3aca758ebad 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -87,7 +87,7 @@ public String getKeyedOperatorName() {
 
 			@Override
 			public String getOperatorName() {
-				return "SelectCepOperator";
+				return "GlobalSelectCepOperator";
 			}
 		});
 	}
@@ -136,7 +136,7 @@ public String getKeyedOperatorName() {
 
 			@Override
 			public String getOperatorName() {
-				return "FlatSelectCepOperator";
+				return "GlobalFlatSelectCepOperator";
 			}
 		});
 	}
@@ -194,7 +194,7 @@ public String getKeyedOperatorName() {
 
 			@Override
 			public String getOperatorName() {
-				return "FlatSelectTimeoutCepOperator";
+				return "GlobalFlatSelectTimeoutCepOperator";
 			}
 		});
 	}
@@ -252,7 +252,7 @@ public String getKeyedOperatorName() {
 
 			@Override
 			public String getOperatorName() {
-				return "SelectTimeoutCepOperator";
+				return "GlobalSelectTimeoutCepOperator";
 			}
 		});
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index c8211201cab..6ad9d9a1669 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -569,4 +569,18 @@ private void checkIfPreviousPatternGreedy() {
 			throw new MalformedPatternException("Optional pattern cannot be preceded by greedy pattern");
 		}
 	}
+
+	@Override
+	public String toString() {
+		return "Pattern{" +
+			"name='" + name + '\'' +
+			", previous=" + previous +
+			", condition=" + condition +
+			", windowTime=" + windowTime +
+			", quantifier=" + quantifier +
+			", untilCondition=" + untilCondition +
+			", times=" + times +
+			", afterMatchSkipStrategy=" + afterMatchSkipStrategy +
+			'}';
+	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index b55051d0609..719f04091f5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -124,12 +124,22 @@ public boolean equals(Object o) {
 		}
 		Quantifier that = (Quantifier) o;
 		return Objects.equals(properties, that.properties) &&
-				consumingStrategy == that.consumingStrategy;
+				consumingStrategy == that.consumingStrategy &&
+				innerConsumingStrategy == that.innerConsumingStrategy;
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(properties, consumingStrategy);
+		return Objects.hash(properties, consumingStrategy, innerConsumingStrategy);
+	}
+
+	@Override
+	public String toString() {
+		return "Quantifier{" +
+			"properties=" + properties +
+			", consumingStrategy=" + consumingStrategy +
+			", innerConsumingStrategy=" + innerConsumingStrategy +
+			'}';
 	}
 
 	/**
@@ -184,5 +194,23 @@ public static Times of(int from, int to) {
 		public static Times of(int times) {
 			return new Times(times, times);
 		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			Times times = (Times) o;
+			return from == times.from &&
+				to == times.to;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(from, to);
+		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
index d42839e3071..5fa6e9c333a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
@@ -548,6 +548,35 @@ public boolean filter(Event value) throws Exception {
 		));
 	}
 
+	@Test(expected = FlinkRuntimeException.class)
+	public void testSkipToFirstElementOfMatch() throws Exception {
+		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+		Event a1 = new Event(1, "a1", 0.0);
+
+		streamEvents.add(new StreamRecord<Event>(a1));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("a",
+			AfterMatchSkipStrategy.skipToFirst("a").throwExceptionOnMiss()
+		).where(
+			new SimpleCondition<Event>() {
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("a");
+				}
+			}
+		);
+		NFA<Event> nfa = compile(pattern, false);
+
+		feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+
+		//skip to first element of a match should throw exception if they are enabled,
+		//this mode is used in MATCH RECOGNIZE which assumes that skipping to first element
+		//would result in infinite loop. In CEP by default(with exceptions disabled), we use no skip
+		//strategy in this case.
+	}
+
 	@Test(expected = FlinkRuntimeException.class)
 	public void testSkipToFirstNonExistentPosition() throws Exception {
 		MissedSkipTo.compute(AfterMatchSkipStrategy.skipToFirst("b").throwExceptionOnMiss());
diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml
index a7143ab3ef0..c3dab6e32cc 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -85,6 +85,13 @@ under the License.
 		</dependency>
 
 		<!-- Used for code generation -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-cep_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.codehaus.janino</groupId>
 			<artifactId>janino</artifactId>
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 34b98a86450..41f0fc5f685 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -33,6 +33,7 @@ import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
 import org.apache.flink.table.validate.BasicOperatorTable
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 /**
@@ -96,8 +97,50 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
     LogicalSort.create(input, sort.collation, sort.offset, sort.fetch)
   }
 
-  override def visit(`match`: LogicalMatch): RelNode =
-    throw new TableException("Logical match in a stream environment is not supported yet.")
+  override def visit(matchRel: LogicalMatch): RelNode = {
+    // visit children and update inputs
+    val input = matchRel.getInput.accept(this)
+
+    // check if input field contains time indicator type
+    // materialize field if no time indicator is present anymore
+    // if input field is already materialized, change to timestamp type
+    val materializer = new RexTimeIndicatorMaterializer(
+      rexBuilder,
+      input.getRowType.getFieldList.map(_.getType))
+
+    // update input expressions
+    val patternDefs = matchRel.getPatternDefinitions.mapValues(_.accept(materializer))
+    val measures = matchRel.getMeasures
+      .mapValues(_.accept(materializer))
+      .mapValues(materializerUtils.materialize)
+    val partitionKeys = matchRel.getPartitionKeys
+      .map(_.accept(materializer))
+      .map(materializerUtils.materialize)
+    val interval = if (matchRel.getInterval != null) {
+      matchRel.getInterval.accept(materializer)
+    } else {
+      null
+    }
+
+    // materialize all output types
+    // TODO allow passing through for rowtime accessor function, once introduced
+    val outputType = materializerUtils.getRowTypeWithoutIndicators(matchRel.getRowType)
+
+    LogicalMatch.create(
+      input,
+      outputType,
+      matchRel.getPattern,
+      matchRel.isStrictStart,
+      matchRel.isStrictEnd,
+      patternDefs,
+      measures,
+      matchRel.getAfter,
+      matchRel.getSubsets.asInstanceOf[java.util.Map[String, java.util.TreeSet[String]]],
+      matchRel.isAllRows,
+      partitionKeys,
+      matchRel.getOrderKeys,
+      interval)
+  }
 
   override def visit(other: RelNode): RelNode = other match {
 
@@ -480,6 +523,23 @@ class RexTimeIndicatorMaterializerUtils(rexBuilder: RexBuilder) {
       input.getRowType.getFieldNames)
   }
 
+  def getRowTypeWithoutIndicators(relType: RelDataType): RelDataType = {
+    val outputTypeBuilder = rexBuilder
+      .getTypeFactory
+      .asInstanceOf[FlinkTypeFactory]
+      .builder()
+
+    relType.getFieldList.asScala.zipWithIndex.foreach { case (field, idx) =>
+      if (FlinkTypeFactory.isTimeIndicatorType(field.getType)) {
+        outputTypeBuilder.add(field.getName, timestamp)
+      } else {
+        outputTypeBuilder.add(field.getName, field.getType)
+      }
+    }
+
+    outputTypeBuilder.build()
+  }
+
   def materializeIfContains(expr: RexNode, index: Int, indicesToMaterialize: Set[Int]): RexNode = {
     if (indicesToMaterialize.contains(index)) {
       materialize(expr)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 4f8dc13ff4c..2816f116ffd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -1098,7 +1098,8 @@ abstract class CodeGenerator(
     }
   }
 
-  private def generateFieldAccess(refExpr: GeneratedExpression, index: Int): GeneratedExpression = {
+  protected def generateFieldAccess(refExpr: GeneratedExpression, index: Int)
+    : GeneratedExpression = {
 
     val fieldAccessExpr = generateFieldAccess(
       refExpr.resultType,
@@ -1135,7 +1136,7 @@ abstract class CodeGenerator(
     GeneratedExpression(resultTerm, nullTerm, resultCode, fieldAccessExpr.resultType)
   }
 
-  private def generateInputAccess(
+  protected def generateInputAccess(
       inputType: TypeInformation[_ <: Any],
       inputTerm: String,
       index: Int)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Indenter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Indenter.scala
index 187e7300a02..bff8a62d519 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Indenter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/Indenter.scala
@@ -43,8 +43,12 @@ class IndentStringContext(sc: StringContext) {
     if (lastnl == -1) ""
     else {
       val ind = str.substring(lastnl + 1)
-      if (ind.trim.isEmpty) ind  // ind is all whitespace. Use this
-      else ""
+      val trimmed = ind.trim
+      if (trimmed.isEmpty || trimmed == "|") {
+        ind // ind is all whitespace or pipe for use with stripMargin. Use this
+      } else {
+        ""
+      }
     }
   }
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
new file mode 100644
index 00000000000..a68bf8e87cf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import java.lang.{Long => JLong}
+import java.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.commons.lang3.StringEscapeUtils.escapeJava
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName}
+import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.util.Collector
+import org.apache.flink.util.MathUtils.checkedDownCast
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * A code generator for generating CEP related functions.
+  *
+  * @param config configuration that determines runtime behavior
+  * @param patternNames sorted sequence of pattern variables
+  * @param input type information about the first input of the Function
+  * @param currentPattern if generating condition the name of pattern, which the condition will
+  *                       be applied to
+  */
+class MatchCodeGenerator(
+    config: TableConfig,
+    input: TypeInformation[_],
+    patternNames: Seq[String],
+    currentPattern: Option[String] = None)
+  extends CodeGenerator(config, false, input){
+
+  private case class GeneratedPatternList(resultTerm: String, code: String)
+
+  /**
+    * Used to assign unique names for list of events per pattern variable name. Those lists
+    * are treated as inputs and are needed by input access code.
+    */
+  private val reusablePatternLists: mutable.HashMap[String, GeneratedPatternList] = mutable
+    .HashMap[String, GeneratedPatternList]()
+
+  /**
+    * Context information used by Pattern reference variable to index rows mapped to it.
+    * Indexes element at offset either from beginning or the end based on the value of first.
+    */
+  private var offset: Int = 0
+  private var first : Boolean = false
+
+  /** Term for row for key extraction */
+  private val keyRowTerm = newName("keyRow")
+
+  /** Term for list of all pattern names */
+  private val patternNamesTerm = newName("patternNames")
+
+  /**
+    * Sets the new reference variable indexing context. This should be used when resolving logical
+    * offsets = LAST/FIRST
+    *
+    * @param first  true if indexing from the beginning, false otherwise
+    * @param offset offset from either beginning or the end
+    */
+  private def updateOffsets(first: Boolean, offset: Int): Unit = {
+    this.first = first
+    this.offset = offset
+  }
+
+  /** Resets indexing context of Pattern variable. */
+  private def resetOffsets(): Unit = {
+    first = false
+    offset = 0
+  }
+
+  private def reusePatternLists(): String = {
+    reusablePatternLists.values.map(_.code).mkString("\n")
+  }
+
+  private def addReusablePatternNames() : Unit = {
+    reusableMemberStatements
+      .add(s"private String[] $patternNamesTerm = new String[] { ${
+        patternNames.map(p => s""""${escapeJava(p)}"""").mkString(", ")
+      } };")
+  }
+
+  /**
+    * Generates a [[org.apache.flink.api.common.functions.Function]] that can be passed to Java
+    * compiler.
+    *
+    * This is a separate method from [[FunctionCodeGenerator.generateFunction()]] because as of
+    * now functions in CEP library do not support rich interfaces
+    *
+    * @param name Class name of the Function. Must not be unique but has to be a valid Java class
+    *             identifier.
+    * @param clazz Flink Function to be generated.
+    * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or
+    *                 output record can be accessed via the given term methods.
+    * @param returnType expected return type
+    * @tparam F Flink Function to be generated.
+    * @tparam T Return type of the Flink Function.
+    * @return instance of GeneratedFunction
+    */
+  def generateMatchFunction[F <: Function, T <: Any](
+      name: String,
+      clazz: Class[F],
+      bodyCode: String,
+      returnType: TypeInformation[T])
+    : GeneratedFunction[F, T] = {
+    val funcName = newName(name)
+    val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName
+    val (functionClass, signature, inputStatements, isInterface) =
+      if (clazz == classOf[IterativeCondition[_]]) {
+        val baseClass = classOf[IterativeCondition[_]]
+        val inputTypeTerm = boxedTypeTermForTypeInfo(input)
+        val contextType = classOf[IterativeCondition.Context[_]].getCanonicalName
+
+        (baseClass,
+          s"boolean filter(Object _in1, $contextType $contextTerm)",
+          List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"),
+          false)
+      } else if (clazz == classOf[PatternSelectFunction[_, _]]) {
+        val baseClass = classOf[PatternSelectFunction[_, _]]
+        val inputTypeTerm =
+          s"java.util.Map<String, java.util.List<${boxedTypeTermForTypeInfo(input)}>>"
+
+        (baseClass,
+          s"Object select($inputTypeTerm $input1Term)",
+          List(),
+          true)
+      } else if (clazz == classOf[PatternFlatSelectFunction[_, _]]) {
+        val baseClass = classOf[PatternFlatSelectFunction[_, _]]
+        val inputTypeTerm =
+          s"java.util.Map<String, java.util.List<${boxedTypeTermForTypeInfo(input)}>>"
+
+        (baseClass,
+          s"void flatSelect($inputTypeTerm $input1Term, $collectorTypeTerm $collectorTerm)",
+          List(),
+          true)
+      } else {
+        throw new CodeGenException("Unsupported Function.")
+      }
+
+    if (!reuseOpenCode().trim.isEmpty || !reuseCloseCode().trim.isEmpty) {
+      throw new TableException(
+        "Match recognize does not support UDFs, nor other functions that require " +
+          "open/close methods yet.")
+    }
+
+    val extendsKeyword = if (isInterface) "implements" else "extends"
+    val funcCode = j"""
+      |public class $funcName $extendsKeyword ${functionClass.getCanonicalName} {
+      |
+      |  ${reuseMemberCode()}
+      |
+      |  public $funcName() throws Exception {
+      |    ${reuseInitCode()}
+      |  }
+      |
+      |  @Override
+      |  public $signature throws Exception {
+      |    ${inputStatements.mkString("\n")}
+      |    ${reusePatternLists()}
+      |    ${reuseInputUnboxingCode()}
+      |    ${reusePerRecordCode()}
+      |    $bodyCode
+      |  }
+      |}
+    """.stripMargin
+
+    GeneratedFunction(funcName, returnType, funcCode)
+  }
+
+  private def generateKeyRow() : GeneratedExpression = {
+    val exp = reusableInputUnboxingExprs
+      .get((keyRowTerm, 0)) match {
+      // input access and unboxing has already been generated
+      case Some(expr) =>
+        expr
+
+      case None =>
+
+        val eventTypeTerm = boxedTypeTermForTypeInfo(input)
+        val nullTerm = newName("isNull")
+
+        val keyCode = j"""
+           |$eventTypeTerm $keyRowTerm = null;
+           |boolean $nullTerm = true;
+           |for (java.util.Map.Entry entry : $input1Term.entrySet()) {
+           |  java.util.List value = (java.util.List) entry.getValue();
+           |  if (value != null && value.size() > 0) {
+           |    $keyRowTerm = ($eventTypeTerm) value.get(0);
+           |    $nullTerm = false;
+           |    break;
+           |  }
+           |}
+           """.stripMargin
+
+        val exp = GeneratedExpression(keyRowTerm, nullTerm, keyCode, input)
+        reusableInputUnboxingExprs((keyRowTerm, 0)) = exp
+        exp
+    }
+    exp.copy(code = NO_CODE)
+  }
+
+  /**
+    * Extracts partition keys from any element of the match
+    *
+    * @param partitionKey partition key to be extracted
+    * @return generated code for the given key
+    */
+  private def generatePartitionKeyAccess(
+      partitionKey: RexInputRef)
+    : GeneratedExpression = {
+
+    val keyRow = generateKeyRow()
+    generateFieldAccess(keyRow, partitionKey.getIndex)
+  }
+
+  def generateOneRowPerMatchExpression(
+      partitionKeys: util.List[RexNode],
+      measures: util.Map[String, RexNode],
+      returnType: RowSchema)
+    : GeneratedExpression = {
+    // For "ONE ROW PER MATCH", the output columns include:
+    // 1) the partition columns;
+    // 2) the columns defined in the measures clause.
+    val resultExprs =
+      partitionKeys.asScala.map { case inputRef: RexInputRef =>
+        generatePartitionKeyAccess(inputRef)
+      } ++ returnType.fieldNames.filter(measures.containsKey(_)).map { fieldName =>
+        generateExpression(measures.get(fieldName))
+      }
+
+    generateResultExpression(
+      resultExprs,
+      returnType.typeInfo,
+      returnType.fieldNames)
+  }
+
+  override def visitCall(call: RexCall): GeneratedExpression = {
+    call.getOperator match {
+      case PREV | NEXT =>
+        val countLiteral = call.operands.get(1).asInstanceOf[RexLiteral]
+        val count = checkedDownCast(countLiteral.getValueAs(classOf[JLong]))
+        if (count != 0) {
+          throw new TableException("Flink does not support physical offsets within partition.")
+        } else {
+          updateOffsets(first = false, 0)
+          val exp = call.getOperands.get(0).accept(this)
+          resetOffsets()
+          exp
+        }
+
+      case FIRST | LAST =>
+        val countLiteral = call.operands.get(1).asInstanceOf[RexLiteral]
+        val offset = checkedDownCast(countLiteral.getValueAs(classOf[JLong]))
+        updateOffsets(call.getOperator == FIRST, offset)
+        val patternExp = call.operands.get(0).accept(this)
+        resetOffsets()
+        patternExp
+
+      case FINAL =>
+        call.getOperands.get(0).accept(this)
+
+      case _ => super.visitCall(call)
+    }
+  }
+
+  override private[flink] def generateProctimeTimestamp() = {
+    val resultTerm = newName("result")
+
+    //TODO use timerService once it is available in PatternFlatSelectFunction
+    val resultCode =
+      j"""
+         |long $resultTerm = System.currentTimeMillis();
+         |""".stripMargin
+    GeneratedExpression(resultTerm, NEVER_NULL, resultCode, SqlTimeTypeInfo.TIMESTAMP)
+  }
+
+  override def visitPatternFieldRef(fieldRef: RexPatternFieldRef): GeneratedExpression = {
+    if (fieldRef.getAlpha.equals("*") && currentPattern.isDefined && offset == 0 && !first) {
+      generateInputAccess(input, input1Term, fieldRef.getIndex)
+    } else {
+      generatePatternFieldRef(fieldRef)
+    }
+  }
+
+  private def generateDefinePatternVariableExp(
+      patternName: String,
+      currentPattern: String)
+    : GeneratedPatternList = {
+    val listName = newName("patternEvents")
+    val eventTypeTerm = boxedTypeTermForTypeInfo(input)
+    val eventNameTerm = newName("event")
+
+    val addCurrent = if (currentPattern == patternName || patternName == "*") {
+      j"""
+         |$listName.add($input1Term);
+         """.stripMargin
+    } else {
+      ""
+    }
+    val listCode = if (patternName == "*") {
+      addReusablePatternNames()
+      val patternTerm = newName("pattern")
+      j"""
+         |java.util.List $listName = new java.util.ArrayList();
+         |for (String $patternTerm : $patternNamesTerm) {
+         |  for ($eventTypeTerm $eventNameTerm :
+         |  $contextTerm.getEventsForPattern($patternTerm)) {
+         |    $listName.add($eventNameTerm);
+         |  }
+         |}
+         """.stripMargin
+    } else {
+      val escapedPatternName = escapeJava(patternName)
+      j"""
+         |java.util.List $listName = new java.util.ArrayList();
+         |for ($eventTypeTerm $eventNameTerm :
+         |  $contextTerm.getEventsForPattern("$escapedPatternName")) {
+         |    $listName.add($eventNameTerm);
+         |}
+         |""".stripMargin
+    }
+
+    val code =
+      j"""
+         |$listCode
+         |$addCurrent
+       """.stripMargin
+
+    GeneratedPatternList(listName, code)
+  }
+
+  private def generateMeasurePatternVariableExp(patternName: String): GeneratedPatternList = {
+    val listName = newName("patternEvents")
+
+    val code = if (patternName == "*") {
+      addReusablePatternNames()
+
+      val patternTerm = newName("pattern")
+
+      j"""
+         |java.util.List $listName = new java.util.ArrayList();
+         |for (String $patternTerm : $patternNamesTerm) {
+         |  java.util.List rows = (java.util.List) $input1Term.get($patternTerm);
+         |  if (rows != null) {
+         |    $listName.addAll(rows);
+         |  }
+         |}
+         """.stripMargin
+    } else {
+      val escapedPatternName = escapeJava(patternName)
+      j"""
+         |java.util.List $listName = (java.util.List) $input1Term.get("$escapedPatternName");
+         |if ($listName == null) {
+         |  $listName = java.util.Collections.emptyList();
+         |}
+         |""".stripMargin
+    }
+
+    GeneratedPatternList(listName, code)
+  }
+
+  private def findEventByLogicalPosition(
+      patternFieldAlpha: String)
+    : GeneratedExpression = {
+    val rowNameTerm = newName("row")
+    val eventTypeTerm = boxedTypeTermForTypeInfo(input)
+    val isRowNull = newName("isRowNull")
+
+    val findEventsByPatternName = reusablePatternLists.get(patternFieldAlpha) match {
+      // input access and unboxing has already been generated
+      case Some(expr) =>
+        expr
+
+      case None =>
+        val exp = currentPattern match {
+          case Some(p) => generateDefinePatternVariableExp(patternFieldAlpha, p)
+          case None => generateMeasurePatternVariableExp(patternFieldAlpha)
+        }
+        reusablePatternLists(patternFieldAlpha) = exp
+        exp
+    }
+
+    val listName = findEventsByPatternName.resultTerm
+    val resultIndex = if (first) {
+      j"""$offset"""
+    } else {
+      j"""$listName.size() - $offset - 1"""
+    }
+
+    val funcCode =
+      j"""
+         |$eventTypeTerm $rowNameTerm = null;
+         |boolean $isRowNull = true;
+         |if ($listName.size() > $offset) {
+         |  $rowNameTerm = (($eventTypeTerm) $listName.get($resultIndex));
+         |  $isRowNull = false;
+         |}
+         |""".stripMargin
+
+    GeneratedExpression(rowNameTerm, isRowNull, funcCode, input)
+  }
+
+  private def generatePatternFieldRef(fieldRef: RexPatternFieldRef): GeneratedExpression = {
+    val escapedAlpha = escapeJava(fieldRef.getAlpha)
+    val patternVariableRef = reusableInputUnboxingExprs
+      .get((s"$escapedAlpha#$first", offset)) match {
+      // input access and unboxing has already been generated
+      case Some(expr) =>
+        expr
+
+      case None =>
+        val exp = findEventByLogicalPosition(fieldRef.getAlpha)
+        reusableInputUnboxingExprs((s"$escapedAlpha#$first", offset)) = exp
+        exp
+    }
+
+    generateFieldAccess(patternVariableRef.copy(code = NO_CODE), fieldRef.getIndex)
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/MatchRecognize.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/MatchRecognize.scala
new file mode 100644
index 00000000000..7a1e81d0811
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/MatchRecognize.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical
+
+import java.util
+
+import com.google.common.collect.ImmutableMap
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode}
+import org.apache.calcite.rex.RexNode
+
+/**
+  * Describes MATCH RECOGNIZE clause.
+  */
+case class MatchRecognize(
+  input: RelNode,
+  rowType: RelDataType,
+  pattern: RexNode,
+  patternDefinitions: ImmutableMap[String, RexNode],
+  measures: ImmutableMap[String, RexNode],
+  after: RexNode,
+  subsets: ImmutableMap[String, util.SortedSet[String]],
+  allRows: Boolean,
+  partitionKeys: util.List[RexNode],
+  orderKeys: RelCollation,
+  interval: RexNode)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonMatchRecognize.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonMatchRecognize.scala
new file mode 100644
index 00000000000..9cf89f9c3fd
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonMatchRecognize.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import java.util.{List => JList, SortedSet => JSortedSet}
+
+import com.google.common.collect.ImmutableMap
+import org.apache.calcite.rel.{RelCollation, RelWriter}
+import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.flink.table.plan.logical.MatchRecognize
+import org.apache.flink.table.runtime.aggregate.SortUtil.directionToOrder
+
+import scala.collection.JavaConverters._
+
+trait CommonMatchRecognize {
+
+  private def partitionKeysToString(
+      keys: JList[RexNode],
+      fieldNames: Seq[String],
+      expression: (RexNode, Seq[String], Option[Seq[RexNode]]) => String)
+    : String =
+    keys.asScala.map(k => expression(k, fieldNames, None)).mkString(", ")
+
+  private def orderingToString(orders: RelCollation, fieldNames: Seq[String]): String =
+    orders.getFieldCollations.asScala.map {
+      x => s"${fieldNames(x.getFieldIndex)} ${directionToOrder(x.direction).getShortName}"
+    }.mkString(", ")
+
+  private def measuresDefineToString(
+      measures: ImmutableMap[String, RexNode],
+      fieldNames: Seq[String],
+      expression: (RexNode, Seq[String], Option[Seq[RexNode]]) => String)
+    : String =
+    measures.asScala.map {
+      case (k, v) => s"${expression(v, fieldNames, None)} AS $k"
+    }.mkString(", ")
+
+  private def rowsPerMatchToString(isAll: Boolean): String =
+    if (isAll) "ALL ROWS PER MATCH" else "ONE ROW PER MATCH"
+
+  private def subsetToString(subset: ImmutableMap[String, JSortedSet[String]]): String =
+    subset.asScala.map {
+      case (k, v) => s"$k = (${v.asScala.mkString(", ")})"
+    }.mkString(", ")
+
+  private def afterMatchToString(
+      after: RexNode,
+      fieldNames: Seq[String])
+    : String =
+    after.getKind match {
+      case SqlKind.SKIP_TO_FIRST => s"SKIP TO FIRST ${
+        after.asInstanceOf[RexCall].operands.get(0).toString
+      }"
+      case SqlKind.SKIP_TO_LAST => s"SKIP TO LAST ${
+        after.asInstanceOf[RexCall].operands.get(0).toString
+      }"
+      case SqlKind.LITERAL => after.asInstanceOf[RexLiteral]
+        .getValueAs(classOf[AfterOption]) match {
+        case AfterOption.SKIP_PAST_LAST_ROW => "SKIP PAST LAST ROW"
+        case AfterOption.SKIP_TO_NEXT_ROW => "SKIP TO NEXT ROW"
+      }
+    }
+
+  private[flink] def matchToString(
+      logicalMatch: MatchRecognize,
+      fieldNames: Seq[String],
+      expression: (RexNode, Seq[String], Option[Seq[RexNode]]) => String)
+    : String = {
+    val partitionBy = if (!logicalMatch.partitionKeys.isEmpty) {
+      s"PARTITION BY: ${
+        partitionKeysToString(logicalMatch.partitionKeys, fieldNames, expression)
+      }"
+    } else {
+      ""
+    }
+
+    val orderBy = if (!logicalMatch.orderKeys.getFieldCollations.isEmpty) {
+      s"ORDER BY: ${orderingToString(logicalMatch.orderKeys, fieldNames)}"
+    } else {
+      ""
+    }
+
+    val measures = if (!logicalMatch.measures.isEmpty) {
+      s"MEASURES: ${measuresDefineToString(logicalMatch.measures, fieldNames, expression)}"
+    } else {
+      ""
+    }
+
+    val afterMatch = s"${afterMatchToString(logicalMatch.after, fieldNames)}"
+
+    val allRows = rowsPerMatchToString(logicalMatch.allRows)
+
+    val pattern = s"PATTERN: (${logicalMatch.pattern.toString})"
+
+    val subset = if (!logicalMatch.subsets.isEmpty) {
+      s"SUBSET: ${subsetToString(logicalMatch.subsets)} "
+    } else {
+      ""
+    }
+
+    val define = s"DEFINE: ${
+      measuresDefineToString(logicalMatch.patternDefinitions, fieldNames, expression)
+    }"
+
+    val body = Seq(partitionBy, orderBy, measures, allRows, afterMatch, pattern, subset, define)
+      .filterNot(_.isEmpty)
+      .mkString(", ")
+
+    s"Match($body)"
+  }
+
+  private[flink] def explainMatch(
+      pw: RelWriter,
+      logicalMatch: MatchRecognize,
+      fieldNames: Seq[String],
+      expression: (RexNode, Seq[String], Option[Seq[RexNode]]) => String)
+    : RelWriter = {
+    pw.itemIf("partitionBy",
+      partitionKeysToString(logicalMatch.partitionKeys, fieldNames, expression),
+      !logicalMatch.partitionKeys.isEmpty)
+      .itemIf("orderBy",
+        orderingToString(logicalMatch.orderKeys, fieldNames),
+        !logicalMatch.orderKeys.getFieldCollations.isEmpty)
+      .itemIf("measures",
+        measuresDefineToString(logicalMatch.measures, fieldNames, expression),
+        !logicalMatch.measures.isEmpty)
+      .item("rowsPerMatch", rowsPerMatchToString(logicalMatch.allRows))
+      .item("after", afterMatchToString(logicalMatch.after, fieldNames))
+      .item("pattern", logicalMatch.pattern.toString)
+      .itemIf("subset",
+        subsetToString(logicalMatch.subsets),
+        !logicalMatch.subsets.isEmpty)
+      .item("define", logicalMatch.patternDefinitions)
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
index 2ad4083302c..5a0896c0d22 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
@@ -18,12 +18,12 @@
 
 package org.apache.flink.table.plan.nodes
 
-import org.apache.calcite.rex.{RexLiteral, RexNode}
-import org.apache.calcite.rel.RelFieldCollation.Direction
 import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.{RelCollation, RelWriter}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.table.runtime.aggregate.SortUtil.directionToOrder
+
 import scala.collection.JavaConverters._
-import org.apache.flink.api.common.operators.Order
-import org.apache.calcite.rel.{RelWriter, RelCollation}
 
 /**
  * Common methods for Flink sort operators.
@@ -46,14 +46,6 @@ trait CommonSort {
       .mkString(", ")
   }
   
-  private[flink] def directionToOrder(direction: Direction) = {
-    direction match {
-      case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
-      case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
-      case _ => throw new IllegalArgumentException("Unsupported direction.")
-    }
-  }
-  
   private def fetchToString(fetch: RexNode, offset: RexNode): String = {
     val limitEnd = getFetchLimitEnd(fetch, offset)
     
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
index f3e1a626482..ccfd904c243 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -31,10 +31,15 @@ trait FlinkRelNode extends RelNode {
 
   private[flink] def getExpressionString(
       expr: RexNode,
-      inFields: List[String],
-      localExprsTable: Option[List[RexNode]]): String = {
+      inFields: Seq[String],
+      localExprsTable: Option[Seq[RexNode]]): String = {
 
     expr match {
+      case pr: RexPatternFieldRef =>
+        val alpha = pr.getAlpha
+        val field = inFields.get(pr.getIndex)
+        s"$alpha.$field"
+
       case i: RexInputRef =>
         inFields.get(i.getIndex)
 
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
index a518e308052..1430204b9e7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
@@ -27,9 +27,10 @@ import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.CommonSort
+import org.apache.flink.table.runtime.aggregate.SortUtil.directionToOrder
 import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction}
 import org.apache.flink.types.Row
-import org.apache.flink.table.plan.nodes.CommonSort
 
 import scala.collection.JavaConverters._
 
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
new file mode 100644
index 00000000000..d3768cc7b1b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.lang.{Boolean => JBoolean, Long => JLong}
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.annotation.VisibleForTesting
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.runtime.RowComparator
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
+import org.apache.flink.cep.nfa.compiler.NFACompiler
+import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
+import org.apache.flink.cep.pattern.conditions.BooleanConditions
+import org.apache.flink.cep.{CEP, PatternStream}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+import scala.collection.JavaConverters._
+import org.apache.flink.table.api._
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.logical.MatchRecognize
+import org.apache.flink.table.plan.nodes.CommonMatchRecognize
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.runtime.`match`._
+import org.apache.flink.table.runtime.aggregate.SortUtil
+import org.apache.flink.table.runtime.conversion.CRowToRowMapFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.{RowKeySelector, RowtimeProcessFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.MathUtils
+
+
+/**
+  * Flink RelNode which matches along with LogicalMatch.
+  */
+class DataStreamMatch(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputNode: RelNode,
+    logicalMatch: MatchRecognize,
+    schema: RowSchema,
+    inputSchema: RowSchema)
+  extends SingleRel(cluster, traitSet, inputNode)
+  with CommonMatchRecognize
+  with DataStreamRel {
+
+  override def needsUpdatesAsRetraction = true
+
+  override def consumesRetractions = true
+
+  override def deriveRowType(): RelDataType = schema.relDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
+    new DataStreamMatch(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      logicalMatch,
+      schema,
+      inputSchema)
+  }
+
+  override def toString: String = {
+    matchToString(logicalMatch, inputSchema.fieldNames, getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    explainMatch(super.explainTerms(pw), logicalMatch, inputSchema.fieldNames, getExpressionString)
+  }
+
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      queryConfig: StreamQueryConfig)
+    : DataStream[CRow] = {
+
+    val inputIsAccRetract = DataStreamRetractionRules.isAccRetract(getInput)
+
+    val config = tableEnv.config
+    val inputTypeInfo = inputSchema.typeInfo
+
+    val crowInput: DataStream[CRow] = getInput
+      .asInstanceOf[DataStreamRel]
+      .translateToPlan(tableEnv, queryConfig)
+
+    if (inputIsAccRetract) {
+      throw new TableException(
+        "Retraction on match recognize is not supported. " +
+          "Note: Match recognize should not follow a non-windowed GroupBy aggregation.")
+    }
+
+    val orderKeys = logicalMatch.orderKeys.getFieldCollations
+    val (timestampedInput, rowComparator) = translateOrder(tableEnv,
+      crowInput,
+      logicalMatch.orderKeys)
+
+    val patternVisitor = new PatternVisitor(config, inputTypeInfo, logicalMatch)
+    val cepPattern = logicalMatch.pattern.accept(patternVisitor)
+    val patternNames = patternVisitor.names
+
+    //TODO remove this once it is supported in CEP library
+    if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
+      throw new TableException(
+        "Patterns that can produce empty matches are not supported. There must be at least one " +
+          "non-optional state.")
+    }
+
+    //TODO remove this once it is supported in CEP library
+    if (cepPattern.getQuantifier.hasProperty(QuantifierProperty.GREEDY)) {
+      throw new TableException(
+        "Greedy quantifiers are not allowed as the last element of a Pattern yet. Finish your " +
+          "pattern with either a simple variable or reluctant quantifier.")
+    }
+
+    if (logicalMatch.interval != null) {
+      throw new TableException(
+        "WITHIN clause is not part of the SQL Standard, thus it is not supported.")
+    }
+
+    val inputDS: DataStream[Row] = timestampedInput
+      .map(new CRowToRowMapFunction)
+      .setParallelism(timestampedInput.getParallelism)
+      .name("ConvertToRow")
+      .returns(inputTypeInfo)
+
+    val partitionKeys = logicalMatch.partitionKeys
+    val partitionedStream = applyPartitioning(partitionKeys, inputDS)
+
+    val patternStream: PatternStream[Row] = if (rowComparator.isDefined) {
+      CEP.pattern[Row](partitionedStream, cepPattern, new EventRowComparator(rowComparator.get))
+    } else {
+      CEP.pattern[Row](partitionedStream, cepPattern)
+    }
+
+    val measures = logicalMatch.measures
+    val outTypeInfo = CRowTypeInfo(schema.typeInfo)
+    if (logicalMatch.allRows) {
+      throw new TableException("All rows per match mode is not supported yet.")
+    } else {
+      val patternSelectFunction =
+        MatchUtil.generateOneRowPerMatchExpression(
+          config,
+          schema,
+          partitionKeys,
+          orderKeys,
+          measures,
+          inputTypeInfo,
+          patternNames.toSeq)
+      patternStream.flatSelect[CRow](patternSelectFunction, outTypeInfo)
+    }
+  }
+
+  private def translateOrder(
+      tableEnv: StreamTableEnvironment,
+      crowInput: DataStream[CRow],
+      orderKeys: RelCollation)
+    : (DataStream[CRow], Option[RowComparator]) = {
+
+    if (orderKeys.getFieldCollations.size() == 0) {
+      throw new ValidationException("You must specify either rowtime or proctime for order by.")
+    }
+
+    // need to identify time between others order fields. Time needs to be first sort element
+    val timeOrderField = SortUtil.getFirstSortField(orderKeys, inputSchema.relDataType)
+
+    if (!FlinkTypeFactory.isTimeIndicatorType(timeOrderField.getType)) {
+      throw new ValidationException(
+        "You must specify either rowtime or proctime for order by as the first one.")
+    }
+
+    // time ordering needs to be ascending
+    if (SortUtil.getFirstSortDirection(orderKeys) != Direction.ASCENDING) {
+      throw new ValidationException(
+        "Primary sort order of a streaming table must be ascending on time.")
+    }
+
+    val rowComparator = if (orderKeys.getFieldCollations.size() > 1) {
+      Some(SortUtil
+        .createRowComparator(inputSchema.relDataType,
+          orderKeys.getFieldCollations.asScala.tail,
+          tableEnv.execEnv.getConfig))
+    } else {
+      None
+    }
+
+    timeOrderField.getType match {
+      case _ if FlinkTypeFactory.isRowtimeIndicatorType(timeOrderField.getType) =>
+        (crowInput.process(
+          new RowtimeProcessFunction(timeOrderField.getIndex, CRowTypeInfo(inputSchema.typeInfo))
+        ).setParallelism(crowInput.getParallelism),
+          rowComparator)
+      case _ =>
+        (crowInput, rowComparator)
+    }
+  }
+
+  private def applyPartitioning(partitionKeys: util.List[RexNode], inputDs: DataStream[Row])
+    : DataStream[Row] = {
+    if (partitionKeys.size() > 0) {
+      val keys = partitionKeys.asScala.map {
+        case ref: RexInputRef => ref.getIndex
+      }.toArray
+      val keySelector = new RowKeySelector(keys, inputSchema.projectedTypeInfo(keys))
+      inputDs.keyBy(keySelector)
+    } else {
+      inputDs
+    }
+  }
+
+  @VisibleForTesting private[flink] def getLogicalMatch = logicalMatch
+}
+
+@VisibleForTesting
+private[flink] class PatternVisitor(
+    config: TableConfig,
+    inputTypeInfo: TypeInformation[Row],
+    logicalMatch: MatchRecognize)
+  extends RexDefaultVisitor[Pattern[Row, Row]] {
+
+  private var pattern: Pattern[Row, Row] = _
+  val names = new collection.mutable.LinkedHashSet[String]()
+
+  override def visitLiteral(literal: RexLiteral): Pattern[Row, Row] = {
+    val patternName = literal.getValueAs(classOf[String])
+    pattern = translateSingleVariable(Option.apply(pattern), patternName)
+
+    val patternDefinition = logicalMatch.patternDefinitions.get(patternName)
+    if (patternDefinition != null) {
+      val condition = MatchUtil.generateIterativeCondition(
+        config,
+        patternDefinition,
+        inputTypeInfo,
+        patternName,
+        names.toSeq)
+
+      pattern.where(condition)
+    } else {
+      pattern.where(BooleanConditions.trueFunction())
+    }
+  }
+
+  override def visitCall(call: RexCall): Pattern[Row, Row] = {
+    call.getOperator match {
+      case PATTERN_CONCAT =>
+        val left = call.operands.get(0)
+        val right = call.operands.get(1)
+
+        pattern = left.accept(this)
+        pattern = right.accept(this)
+        pattern
+
+      case PATTERN_QUANTIFIER =>
+        val name = call.operands.get(0) match {
+          case c: RexLiteral => c
+          case x => throw new TableException(s"Expression not supported: $x Group patterns are " +
+            s"not supported yet.")
+        }
+        pattern = name.accept(this)
+        val startNum = MathUtils.checkedDownCast(call.operands.get(1).asInstanceOf[RexLiteral]
+          .getValueAs(classOf[JLong]))
+        val endNum = MathUtils.checkedDownCast(call.operands.get(2).asInstanceOf[RexLiteral]
+          .getValueAs(classOf[JLong]))
+        val isGreedy = !call.operands.get(3).asInstanceOf[RexLiteral]
+          .getValueAs(classOf[JBoolean])
+
+        applyQuantifier(pattern, startNum, endNum, isGreedy)
+
+      case PATTERN_ALTER =>
+        throw TableException(s"Expression not supported: $call. Currently, CEP doesn't support " +
+          s"branching patterns.")
+
+      case PATTERN_PERMUTE =>
+        throw TableException(s"Expression not supported: $call. Currently, CEP doesn't support " +
+          s"PERMUTE patterns.")
+
+      case PATTERN_EXCLUDE =>
+        throw TableException(s"Expression not supported: $call. Currently, CEP doesn't support " +
+          s"'{-' '-}' patterns.")
+    }
+  }
+
+  override def visitNode(rexNode: RexNode): Pattern[Row, Row] = throw new TableException(
+    s"Unsupported expression within Pattern: [$rexNode]")
+
+  private def translateSkipStrategy = {
+    val getPatternTarget = () => logicalMatch.after.asInstanceOf[RexCall].getOperands.get(0)
+      .asInstanceOf[RexLiteral].getValueAs(classOf[String])
+
+    logicalMatch.after.getKind match {
+      case SqlKind.LITERAL =>
+        logicalMatch.after.asInstanceOf[RexLiteral].getValueAs(classOf[AfterOption]) match {
+          case AfterOption.SKIP_PAST_LAST_ROW => AfterMatchSkipStrategy.skipPastLastEvent()
+          case AfterOption.SKIP_TO_NEXT_ROW => AfterMatchSkipStrategy.skipToNext()
+        }
+      case SqlKind.SKIP_TO_FIRST =>
+        AfterMatchSkipStrategy.skipToFirst(getPatternTarget()).throwExceptionOnMiss()
+      case SqlKind.SKIP_TO_LAST =>
+        AfterMatchSkipStrategy.skipToLast(getPatternTarget()).throwExceptionOnMiss()
+    }
+  }
+
+  private def translateSingleVariable(
+      previousPattern: Option[Pattern[Row, Row]],
+      patternName: String)
+    : Pattern[Row, Row] = {
+    if (names.contains(patternName)) {
+      throw new TableException("Pattern variables must be unique. That might change in the future.")
+    } else {
+      names.add(patternName)
+    }
+
+    previousPattern match {
+      case Some(p) => p.next(patternName)
+      case None =>
+        Pattern.begin(patternName, translateSkipStrategy)
+    }
+  }
+
+  private def applyQuantifier(
+      pattern: Pattern[Row, Row],
+      startNum: Int,
+      endNum: Int,
+      greedy: Boolean)
+    : Pattern[Row, Row] = {
+    val isOptional = startNum == 0 && endNum == 1
+
+    val newPattern = if (startNum == 0 && endNum == -1) { // zero or more
+      pattern.oneOrMore().optional().consecutive()
+    } else if (startNum == 1 && endNum == -1) { // one or more
+      pattern.oneOrMore().consecutive()
+    } else if (isOptional) { // optional
+      pattern.optional()
+    } else if (endNum != -1) { // times
+      pattern.times(startNum, endNum).consecutive()
+    } else { // times or more
+      pattern.timesOrMore(startNum).consecutive()
+    }
+
+    if (greedy && isOptional) {
+      newPattern
+    } else if (greedy) {
+      newPattern.greedy()
+    } else if (isOptional) {
+      throw new TableException("Reluctant optional variables are not supported yet.")
+    } else {
+      newPattern
+    }
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalMatch.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalMatch.scala
new file mode 100644
index 00000000000..9e29b105dc8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalMatch.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelCollation, RelNode}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Match
+import org.apache.calcite.rel.logical.LogicalMatch
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalMatch(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    input: RelNode,
+    rowType: RelDataType,
+    pattern: RexNode,
+    strictStart: Boolean,
+    strictEnd: Boolean,
+    patternDefinitions: util.Map[String, RexNode],
+    measures: util.Map[String, RexNode],
+    after: RexNode,
+    subsets: util.Map[String, _ <: util.SortedSet[String]],
+    allRows: Boolean,
+    partitionKeys: util.List[RexNode],
+    orderKeys: RelCollation,
+    interval: RexNode)
+  extends Match(
+    cluster,
+    traitSet,
+    input,
+    rowType,
+    pattern,
+    strictStart,
+    strictEnd,
+    patternDefinitions,
+    measures,
+    after,
+    subsets,
+    allRows,
+    partitionKeys,
+    orderKeys,
+    interval)
+  with FlinkLogicalRel {
+
+  override def copy(
+      input: RelNode,
+      rowType: RelDataType,
+      pattern: RexNode,
+      strictStart: Boolean,
+      strictEnd: Boolean,
+      patternDefinitions: util.Map[String, RexNode],
+      measures: util.Map[String, RexNode],
+      after: RexNode,
+      subsets: util.Map[String, _ <: util.SortedSet[String]],
+      allRows: Boolean,
+      partitionKeys: util.List[RexNode],
+      orderKeys: RelCollation,
+      interval: RexNode): Match = {
+    new FlinkLogicalMatch(
+      cluster,
+      traitSet,
+      input,
+      rowType,
+      pattern,
+      strictStart,
+      strictEnd,
+      patternDefinitions,
+      measures,
+      after,
+      subsets,
+      allRows,
+      partitionKeys,
+      orderKeys,
+      interval)
+  }
+}
+
+private class FlinkLogicalMatchConverter
+  extends ConverterRule(
+    classOf[LogicalMatch],
+    Convention.NONE,
+    FlinkConventions.LOGICAL,
+    "FlinkLogicalMatchConverter") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val logicalMatch = rel.asInstanceOf[LogicalMatch]
+    val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
+    val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)
+
+    new FlinkLogicalMatch(
+      rel.getCluster,
+      traitSet,
+      newInput,
+      logicalMatch.getRowType,
+      logicalMatch.getPattern,
+      logicalMatch.isStrictStart,
+      logicalMatch.isStrictEnd,
+      logicalMatch.getPatternDefinitions,
+      logicalMatch.getMeasures,
+      logicalMatch.getAfter,
+      logicalMatch.getSubsets,
+      logicalMatch.isAllRows,
+      logicalMatch.getPartitionKeys,
+      logicalMatch.getOrderKeys,
+      logicalMatch.getInterval)
+  }
+}
+
+object FlinkLogicalMatch {
+  val CONVERTER: ConverterRule = new FlinkLogicalMatchConverter()
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a4a09f716d1..6e2ccdeba5b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.plan.rules
 import org.apache.calcite.rel.core.RelFactories
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.tools.{RuleSet, RuleSets}
+import org.apache.flink.table.plan.nodes.logical
 import org.apache.flink.table.plan.rules.common._
 import org.apache.flink.table.plan.rules.logical._
 import org.apache.flink.table.plan.rules.dataSet._
@@ -142,7 +143,8 @@ object FlinkRuleSets {
     FlinkLogicalValues.CONVERTER,
     FlinkLogicalTableSourceScan.CONVERTER,
     FlinkLogicalTableFunctionScan.CONVERTER,
-    FlinkLogicalNativeTableScan.CONVERTER
+    FlinkLogicalNativeTableScan.CONVERTER,
+    FlinkLogicalMatch.CONVERTER
   )
 
   /**
@@ -229,7 +231,8 @@ object FlinkRuleSets {
     DataStreamWindowJoinRule.INSTANCE,
     DataStreamJoinRule.INSTANCE,
     DataStreamTemporalTableJoinRule.INSTANCE,
-    StreamTableSourceScanRule.INSTANCE
+    StreamTableSourceScanRule.INSTANCE,
+    DataStreamMatchRule.INSTANCE
   )
 
   /**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala
new file mode 100644
index 00000000000..bbebc73b24d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.logical.MatchRecognize
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamMatch
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalMatch
+import org.apache.flink.table.plan.schema.RowSchema
+
+class DataStreamMatchRule
+  extends ConverterRule(
+    classOf[FlinkLogicalMatch],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.DATASTREAM,
+    "DataStreamMatchRule") {
+
+  override def convert(rel: RelNode): RelNode = {
+    val logicalMatch: FlinkLogicalMatch = rel.asInstanceOf[FlinkLogicalMatch]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+    val convertInput: RelNode =
+      RelOptRule.convert(logicalMatch.getInput, FlinkConventions.DATASTREAM)
+
+    try {
+      Class.forName("org.apache.flink.cep.pattern.Pattern")
+    } catch {
+      case ex: ClassNotFoundException => throw new TableException(
+        "MATCH RECOGNIZE clause requires flink-cep dependency to be present on the classpath.",
+        ex)
+    }
+
+    new DataStreamMatch(
+      rel.getCluster,
+      traitSet,
+      convertInput,
+      MatchRecognize(
+        logicalMatch.getInput,
+        logicalMatch.getRowType,
+        logicalMatch.getPattern,
+        logicalMatch.getPatternDefinitions,
+        logicalMatch.getMeasures,
+        logicalMatch.getAfter,
+        logicalMatch.getSubsets,
+        logicalMatch.isAllRows,
+        logicalMatch.getPartitionKeys,
+        logicalMatch.getOrderKeys,
+        logicalMatch.getInterval
+      ),
+      new RowSchema(logicalMatch.getRowType),
+      new RowSchema(logicalMatch.getInput.getRowType))
+  }
+}
+
+object DataStreamMatchRule {
+  val INSTANCE: RelOptRule = new DataStreamMatchRule
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/RowKeySelector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/RowKeySelector.scala
new file mode 100644
index 00000000000..d75f3bbb45a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/RowKeySelector.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.apache.flink.types.Row
+
+class RowKeySelector(
+    val keyFields: Array[Int],
+    @transient var returnType: TypeInformation[Row])
+  extends KeySelector[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  // check if type implements proper equals/hashCode
+  validateEqualsHashCode("grouping", returnType)
+
+  override def getKey(value: Row): Row = {
+    Row.project(value, keyFields)
+  }
+
+  override def getProducedType: TypeInformation[Row] = returnType
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
index 67bb60325f8..e825b18b8e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
@@ -21,7 +21,6 @@ import org.apache.calcite.rel.`type`._
 import org.apache.calcite.rel.RelCollation
 import org.apache.calcite.rel.RelFieldCollation
 import org.apache.calcite.rel.RelFieldCollation.Direction
-
 import org.apache.flink.types.Row
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -36,9 +35,10 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.util.Preconditions
-
 import java.util.Comparator
 
+import org.apache.flink.api.common.operators.Order
+
 import scala.collection.JavaConverters._
 
 /**
@@ -123,7 +123,7 @@ object SortUtil {
     *
    * @return A RowComparator for the provided sort collations and input type.
    */
-  private def createRowComparator(
+  def createRowComparator(
       inputType: RelDataType,
       fieldCollations: Seq[RelFieldCollation],
       execConfig: ExecutionConfig): RowComparator = {
@@ -176,7 +176,20 @@ object SortUtil {
     val idx = collationSort.getFieldCollations.get(0).getFieldIndex
     rowType.getFieldList.get(idx)
   }
-  
+
+  /**
+    * Translates direction into Order
+    *
+    * @param direction order direction
+    * @return corresponding order
+    */
+  def directionToOrder(direction: Direction): Order = {
+    direction match {
+      case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
+      case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
+      case _ => throw new IllegalArgumentException("Unsupported direction.")
+    }
+  }
 }
 
 /**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/EventRowComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/EventRowComparator.scala
new file mode 100644
index 00000000000..d51f64e486d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/EventRowComparator.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.`match`
+
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.cep.EventComparator
+import org.apache.flink.types.Row
+
+/**
+  * Bridge between Row [[TypeComparator]] and [[EventComparator]] object
+  */
+class EventRowComparator(
+    private val rowComp: TypeComparator[Row])
+  extends EventComparator[Row] {
+
+  override def compare(arg0: Row, arg1: Row): Int = {
+    rowComp.compare(arg0, arg1)
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
new file mode 100644
index 00000000000..fc6755ea79f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.`match`
+
+import java.io.{IOException, ObjectInputStream}
+
+import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+
+/**
+  * IterativeConditionRunner with [[Row]] value.
+  */
+class IterativeConditionRunner(
+    name: String,
+    code: String)
+  extends IterativeCondition[Row]
+  with Compiler[IterativeCondition[Row]]
+  with Logging {
+
+  @transient private var function: IterativeCondition[Row] = _
+
+  def init(): Unit = {
+    LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
+    // We cannot get user's classloader currently, see FLINK-6938 for details
+    val clazz = compile(Thread.currentThread().getContextClassLoader, name, code)
+    LOG.debug("Instantiating IterativeCondition.")
+    function = clazz.newInstance()
+  }
+
+  override def filter(value: Row, ctx: IterativeCondition.Context[Row]): Boolean = {
+    function.filter(value, ctx)
+  }
+
+  @throws(classOf[IOException])
+  private def readObject(in: ObjectInputStream): Unit = {
+    in.defaultReadObject()
+    if (function == null) {
+      init()
+    }
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/MatchUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/MatchUtil.scala
new file mode 100644
index 00000000000..be3ace34d2c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/MatchUtil.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.`match`
+
+import java.util
+
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.MatchCodeGenerator
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * An util class to generate match functions.
+  */
+object MatchUtil {
+
+  private[flink] def generateIterativeCondition(
+      config: TableConfig,
+      patternDefinition: RexNode,
+      inputTypeInfo: TypeInformation[_],
+      patternName: String,
+      names: Seq[String])
+    : IterativeConditionRunner = {
+    val generator = new MatchCodeGenerator(config, inputTypeInfo, names, Some(patternName))
+    val condition = generator.generateExpression(patternDefinition)
+    val body =
+      s"""
+         |${condition.code}
+         |return ${condition.resultTerm};
+         |""".stripMargin
+
+    val genCondition = generator
+      .generateMatchFunction("MatchRecognizeCondition",
+        classOf[IterativeCondition[Row]],
+        body,
+        condition.resultType)
+    new IterativeConditionRunner(genCondition.name, genCondition.code)
+  }
+
+  private[flink] def generateOneRowPerMatchExpression(
+      config: TableConfig,
+      returnType: RowSchema,
+      partitionKeys: util.List[RexNode],
+      orderKeys: util.List[RelFieldCollation],
+      measures: util.Map[String, RexNode],
+      inputTypeInfo: TypeInformation[_],
+      patternNames: Seq[String])
+    : PatternFlatSelectFunction[Row, CRow] = {
+    val generator = new MatchCodeGenerator(config, inputTypeInfo, patternNames)
+
+    val resultExpression = generator.generateOneRowPerMatchExpression(
+      partitionKeys,
+      measures,
+      returnType)
+    val body =
+      s"""
+         |${resultExpression.code}
+         |return ${resultExpression.resultTerm};
+         |""".stripMargin
+
+    val genFunction = generator.generateMatchFunction(
+      "MatchRecognizePatternSelectFunction",
+      classOf[PatternSelectFunction[Row, Row]],
+      body,
+      resultExpression.resultType)
+    new PatternSelectFunctionRunner(genFunction.name, genFunction.code)
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
new file mode 100644
index 00000000000..643352271b5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.`match`
+
+import java.io.{IOException, ObjectInputStream}
+import java.util
+
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * PatternSelectFunctionRunner with [[Row]] input and [[CRow]] output.
+  */
+class PatternSelectFunctionRunner(
+    name: String,
+    code: String)
+  extends PatternFlatSelectFunction[Row, CRow]
+  with Compiler[PatternSelectFunction[Row, Row]]
+  with Logging {
+
+  @transient private var outCRow: CRow = _
+
+  @transient private var function: PatternSelectFunction[Row, Row] = _
+
+  def init(): Unit = {
+    LOG.debug(s"Compiling PatternSelectFunction: $name \n\n Code:\n$code")
+    val clazz = compile(Thread.currentThread().getContextClassLoader, name, code)
+    LOG.debug("Instantiating PatternSelectFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def flatSelect(
+      pattern: util.Map[String, util.List[Row]],
+      out: Collector[CRow])
+    : Unit = {
+    outCRow.row = function.select(pattern)
+    out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp()
+    out.collect(outCRow)
+  }
+
+  @throws(classOf[IOException])
+  private def readObject(in: ObjectInputStream): Unit = {
+    in.defaultReadObject()
+
+    if (outCRow == null) {
+      outCRow = new CRow(null, true)
+    }
+
+    if (function == null) {
+      init()
+    }
+  }
+}
+
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index b906a0d1091..ff7fbf5a0a9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -476,6 +476,13 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.REPEAT,
     ScalarSqlFunctions.REGEXP_REPLACE,
 
+    // MATCH_RECOGNIZE
+    SqlStdOperatorTable.FIRST,
+    SqlStdOperatorTable.LAST,
+    SqlStdOperatorTable.PREV,
+    SqlStdOperatorTable.FINAL,
+    SqlStdOperatorTable.RUNNING,
+
     // EXTENSIONS
     BasicOperatorTable.TUMBLE,
     BasicOperatorTable.HOP,
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/MatchRecognizeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/MatchRecognizeTest.scala
new file mode 100644
index 00000000000..98a8d089d29
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/MatchRecognizeTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{term, _}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class MatchRecognizeTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime)
+
+  @Test
+  def testSimpleWithDefaults(): Unit = {
+    val sqlQuery =
+      """SELECT T.aa as ta
+        |FROM MyTable
+        |MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.a as aa
+        |  PATTERN (A B)
+        |  DEFINE
+        |    A AS a = 1,
+        |    B AS b = 'b'
+        |) AS T""".stripMargin
+
+    val expected = unaryNode(
+      "DataStreamMatch",
+      streamTableNode(0),
+      term("orderBy", "proctime ASC"),
+      term("measures", "FINAL(A.a) AS aa"),
+      term("rowsPerMatch", "ONE ROW PER MATCH"),
+      term("after",
+        "SKIP TO NEXT ROW"), //this is not SQL-standard compliant, SKIP PAST LAST
+      // should be the default
+      term("pattern", "('A', 'B')"),
+      term("define", "{A==(LAST(*.$0, 0), 1)", "B==(LAST(*.$1, 0), 'b')}"))
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
new file mode 100644
index 00000000000..3917bdf8af9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.`match`
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.codegen.CodeGenException
+import org.apache.flink.table.runtime.stream.sql.ToMillis
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class MatchOperatorValidationTest extends TableTestBase {
+
+  private val streamUtils = streamTestUtil()
+  streamUtils.addTable[(String, Long, Int, Int)]("Ticker",
+    'symbol,
+    'tstamp,
+    'price,
+    'tax,
+    'proctime.proctime)
+  streamUtils.addFunction("ToMillis", new ToMillis)
+
+  @Test
+  def testSortProcessingTimeDesc(): Unit = {
+    thrown.expectMessage("Primary sort order of a streaming table must be ascending on time.")
+    thrown.expect(classOf[ValidationException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime DESC
+         |  MEASURES
+         |    A.symbol AS aSymbol
+         |  PATTERN (A B)
+         |  DEFINE
+         |    A AS symbol = 'a'
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+
+  @Test
+  def testSortProcessingTimeSecondaryField(): Unit = {
+    thrown.expectMessage("You must specify either rowtime or proctime for order by as " +
+      "the first one.")
+    thrown.expect(classOf[ValidationException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY price, proctime
+         |  MEASURES
+         |    A.symbol AS aSymbol
+         |  PATTERN (A B)
+         |  DEFINE
+         |    A AS symbol = 'a'
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+
+  @Test
+  def testSortNoOrder(): Unit = {
+    thrown.expectMessage("You must specify either rowtime or proctime for order by.")
+    thrown.expect(classOf[ValidationException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  MEASURES
+         |    A.symbol AS aSymbol
+         |  PATTERN (A B)
+         |  DEFINE
+         |    A AS symbol = 'a'
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+
+  @Test
+  def testUpdatesInUpstreamOperatorNotSupported(): Unit = {
+    thrown.expectMessage("Retraction on match recognize is not supported. Note: Match " +
+      "recognize should not follow a non-windowed GroupBy aggregation.")
+    thrown.expect(classOf[TableException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM (SELECT DISTINCT * FROM Ticker)
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    A.symbol AS aSymbol
+         |  ONE ROW PER MATCH
+         |  PATTERN (A B)
+         |  DEFINE
+         |    A AS symbol = 'a'
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+  }
+
+  // ***************************************************************************************
+  // * Those validations are temporary. We should remove those tests once we support those *
+  // * features.                                                                           *
+  // ***************************************************************************************
+
+  @Test
+  def testAllRowsPerMatch(): Unit = {
+    thrown.expectMessage("All rows per match mode is not supported yet.")
+    thrown.expect(classOf[TableException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    A.symbol AS aSymbol
+         |  ALL ROWS PER MATCH
+         |  PATTERN (A B)
+         |  DEFINE
+         |    A AS symbol = 'a'
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+
+  @Test
+  def testGreedyQuantifierAtTheEndIsNotSupported(): Unit = {
+    thrown.expectMessage("Greedy quantifiers are not allowed as the last element of a " +
+      "Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier.")
+    thrown.expect(classOf[TableException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    A.symbol AS aSymbol
+         |  PATTERN (A B+)
+         |  DEFINE
+         |    A AS symbol = 'a'
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+
+  @Test
+  def testPatternsProducingEmptyMatchesAreNotSupported(): Unit = {
+    thrown.expectMessage("Patterns that can produce empty matches are not supported. " +
+      "There must be at least one non-optional state.")
+    thrown.expect(classOf[TableException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    A.symbol AS aSymbol
+         |  PATTERN (A*)
+         |  DEFINE
+         |    A AS symbol = 'a'
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+
+  @Test
+  def testUDFsAreNotSupportedInMeasures(): Unit = {
+    thrown.expectMessage(
+      "Match recognize does not support UDFs, nor other functions that require " +
+        "open/close methods yet.")
+    thrown.expect(classOf[TableException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    ToMillis(A.proctime) AS aProctime
+         |  PATTERN (A B)
+         |  DEFINE
+         |    A AS A.symbol = 'a'
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+
+  @Test
+  def testUDFsAreNotSupportedInDefine(): Unit = {
+    thrown.expectMessage(
+      "Match recognize does not support UDFs, nor other functions that require " +
+        "open/close methods yet.")
+    thrown.expect(classOf[TableException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    A.symbol AS aSymbol
+         |  PATTERN (A B)
+         |  DEFINE
+         |    A AS ToMillis(A.proctime) = 2
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+
+  @Test
+  def testAggregatesAreNotSupportedInMeasures(): Unit = {
+    thrown.expectMessage(
+      "Unsupported call: SUM \nIf you think this function should be supported, you can " +
+        "create an issue and start a discussion for it.")
+    thrown.expect(classOf[CodeGenException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    SUM(A.price + A.tax) AS cost
+         |  PATTERN (A B)
+         |  DEFINE
+         |    A AS A.symbol = 'a'
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+
+  @Test
+  def testAggregatesAreNotSupportedInDefine(): Unit = {
+    thrown.expectMessage(
+      "Unsupported call: SUM \nIf you think this function should be supported, you can " +
+        "create an issue and start a discussion for it.")
+    thrown.expect(classOf[CodeGenException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    B.price as bPrice
+         |  PATTERN (A+ B)
+         |  DEFINE
+         |    A AS SUM(A.price + A.tax) < 10
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
new file mode 100644
index 00000000000..9bf651fc0b8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.`match`
+
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy._
+import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.table.api.TableException
+import org.junit.Test
+
+class PatternTranslatorTest extends PatternTranslatorTestBase {
+  @Test
+  def simplePattern(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN (A B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToNext()).next("B"))
+  }
+
+  @Test
+  def testAfterMatch(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   AFTER MATCH SKIP TO NEXT ROW
+        |   PATTERN (A B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToNext()).next("B"))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   AFTER MATCH SKIP TO LAST A
+        |   PATTERN (A B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToLast("A").throwExceptionOnMiss()).next("B"))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   AFTER MATCH SKIP TO FIRST A
+        |   PATTERN (A B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToFirst("A").throwExceptionOnMiss()).next("B"))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   AFTER MATCH SKIP PAST LAST ROW
+        |   PATTERN (A B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipPastLastEvent()).next("B"))
+  }
+
+  @Test
+  def testQuantifiers(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN (A{2,} B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToNext()).timesOrMore(2).consecutive().greedy().next("B"))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN (A+ B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToNext()).oneOrMore().consecutive().greedy().next("B"))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN (A{2,6} B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToNext()).times(2, 6).consecutive().greedy().next("B"))
+  }
+
+  @Test
+  def testOptional(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN (A* B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToNext()).oneOrMore().consecutive().greedy().optional().next("B"))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN (A? B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToNext()).optional().next("B"))
+  }
+
+  @Test
+  def testReluctant(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN (A{2,}? B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToNext()).timesOrMore(2).consecutive().next("B"))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN (A+? B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToNext()).oneOrMore().consecutive().next("B"))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN (A{2,6}? B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToNext()).times(2, 6).consecutive().next("B"))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN (A*? B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      Pattern.begin("A", skipToNext()).oneOrMore().consecutive().optional().next("B"))
+  }
+
+  @Test
+  def testControlCharsInPatternName(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    `A"`.f0 AS aid
+        |  PATTERN (`A"`? \u006C C)
+        |  DEFINE
+        |    `A"` as `A"`.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      Pattern.begin("A\"", skipToNext()).optional().next("\u006C").next("C"))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testReluctantOptionalNotSupported(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN (A?? B)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      null /* don't care */)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGroupPatternsAreNotSupported(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 as aF0
+        |   PATTERN ((A B)+ C)
+        |   DEFINE
+        |     A as A.f0 = 1
+        |)""".stripMargin,
+      null /* don't care */)
+  }
+
+  @Test
+  def testPermutationsAreNotSupported(): Unit = {
+    thrown.expectMessage("Currently, CEP doesn't support PERMUTE patterns.")
+    thrown.expect(classOf[TableException])
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 AS aF0
+        |   PATTERN (PERMUTE(A  C))
+        |   DEFINE
+        |     A AS A.f0 = 1
+        |)""".stripMargin,
+      null /* don't care */)
+  }
+
+  @Test
+  def testExclusionsAreNotSupported(): Unit = {
+    thrown.expectMessage("Currently, CEP doesn't support '{-' '-}' patterns.")
+    thrown.expect(classOf[TableException])
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 AS aF0
+        |   PATTERN (A { - B - }  C)
+        |   DEFINE
+        |     A AS A.f0 = 1
+        |)""".stripMargin,
+      null /* don't care */)
+  }
+
+  @Test
+  def testAlternationsAreNotSupported(): Unit = {
+    thrown.expectMessage("Currently, CEP doesn't support branching patterns.")
+    thrown.expect(classOf[TableException])
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 AS aF0
+        |   PATTERN (( A | B )  C)
+        |   DEFINE
+        |     A AS A.f0 = 1
+        |)""".stripMargin,
+      null /* don't care */)
+  }
+
+  @Test
+  def testPhysicalOffsetsAreNotSupported(): Unit = {
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage("Flink does not support physical offsets within partition.")
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 AS aF0
+        |   PATTERN (A)
+        |   DEFINE
+        |     A AS PREV(A.f0) = 1
+        |)""".stripMargin,
+      null /* don't care */)
+  }
+
+  @Test
+  def testPatternVariablesMustBeUnique(): Unit = {
+    thrown.expectMessage("Pattern variables must be unique. That might change in the future.")
+    thrown.expect(classOf[TableException])
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |   ORDER BY proctime
+        |   MEASURES
+        |     A.f0 AS aF0
+        |   PATTERN (A B A)
+        |   DEFINE
+        |     A AS A.f0 = 1
+        |)""".stripMargin,
+      null /* don't care */)
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
new file mode 100644
index 00000000000..55bdb5245c2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.`match`
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableConfig, TableEnvironment}
+import org.apache.flink.table.calcite.FlinkPlannerImpl
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan, PatternVisitor}
+import org.apache.flink.types.Row
+import org.apache.flink.util.TestLogger
+import org.junit.Assert._
+import org.junit.rules.ExpectedException
+import org.junit.{ComparisonFailure, Rule}
+import org.mockito.Mockito.{mock, when}
+
+abstract class PatternTranslatorTestBase extends TestLogger{
+
+  private val expectedException = ExpectedException.none()
+
+  @Rule
+  def thrown: ExpectedException = expectedException
+
+  // setup test utils
+  private val testTableTypeInfo = new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO)
+  private val tableName = "testTable"
+  private val context = prepareContext(testTableTypeInfo)
+  private val planner = new FlinkPlannerImpl(
+    context._2.getFrameworkConfig,
+    context._2.getPlanner,
+    context._2.getTypeFactory,
+    context._2.sqlToRelConverterConfig)
+
+  private def prepareContext(typeInfo: TypeInformation[Row])
+  : (RelBuilder, StreamTableEnvironment, StreamExecutionEnvironment) = {
+    // create DataStreamTable
+    val dataStreamMock = mock(classOf[DataStream[Row]])
+    val jDataStreamMock = mock(classOf[JDataStream[Row]])
+    when(dataStreamMock.javaStream).thenReturn(jDataStreamMock)
+    when(jDataStreamMock.getType).thenReturn(typeInfo)
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.registerDataStream(tableName, dataStreamMock, 'f0, 'proctime.proctime)
+
+    // prepare RelBuilder
+    val relBuilder = tEnv.getRelBuilder
+    relBuilder.scan(tableName)
+
+    (relBuilder, tEnv, env)
+  }
+
+  def verifyPattern(matchRecognize: String, expected: Pattern[Row, _ <: Row]): Unit = {
+    // create RelNode from SQL expression
+    val parsed = planner.parse(
+      s"""
+         |SELECT *
+         |FROM $tableName
+         |$matchRecognize
+         |""".stripMargin)
+    val validated = planner.validate(parsed)
+    val converted = planner.rel(validated).rel
+
+    val env = context._2
+    val optimized = env.optimize(converted, updatesAsRetraction = false)
+
+    // throw exception if plan contains more than a match
+    if (!optimized.getInput(0).isInstanceOf[DataStreamScan]) {
+      fail("Expression is converted into more than a Match operation. Use a different test method.")
+    }
+
+    val dataMatch = optimized
+      .asInstanceOf[DataStreamMatch]
+
+    val pVisitor = new PatternVisitor(new TableConfig, testTableTypeInfo, dataMatch.getLogicalMatch)
+    val p = dataMatch.getLogicalMatch.pattern.accept(pVisitor)
+
+    compare(expected, p)
+  }
+
+  private def compare(expected: Pattern[Row, _ <: Row], actual: Pattern[Row, _ <: Row]): Unit = {
+    var currentLeft = expected
+    var currentRight = actual
+    do {
+      val sameName = currentLeft.getName == currentRight.getName
+      val sameQuantifier = currentLeft.getQuantifier == currentRight.getQuantifier
+      val sameTimes = currentLeft.getTimes == currentRight.getTimes
+      val sameSkipStrategy = currentLeft.getAfterMatchSkipStrategy ==
+        currentRight.getAfterMatchSkipStrategy
+
+      currentLeft = currentLeft.getPrevious
+      currentRight = currentRight.getPrevious
+
+      if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy) {
+        throw new ComparisonFailure("Compiled different pattern.",
+          expected.toString,
+          actual.toString)
+      }
+
+    } while (currentLeft != null)
+
+    if (currentRight != null) {
+      throw new ComparisonFailure("Compiled different pattern.", expected.toString, actual.toString)
+    }
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
new file mode 100644
index 00000000000..2245146e9df
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import java.sql.Timestamp
+import java.util.TimeZone
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableConfig, TableEnvironment}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+class MatchRecognizeITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testSimpleCEP(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String)]
+    data.+=((1, "a"))
+    data.+=((2, "z"))
+    data.+=((3, "b"))
+    data.+=((4, "c"))
+    data.+=((5, "d"))
+    data.+=((6, "a"))
+    data.+=((7, "b"))
+    data.+=((8, "c"))
+    data.+=((9, "h"))
+
+    val t = env.fromCollection(data).toTable(tEnv,'id, 'name, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery =
+      s"""
+        |SELECT T.aid, T.bid, T.cid
+        |FROM MyTable
+        |MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    `A"`.id AS aid,
+        |    \u006C.id AS bid,
+        |    C.id AS cid
+        |  PATTERN (`A"` \u006C C)
+        |  DEFINE
+        |    `A"` AS name = 'a',
+        |    \u006C AS name = 'b',
+        |    C AS name = 'c'
+        |) AS T
+        |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("6,7,8")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleCEPWithNulls(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String, String)]
+    data.+=((1, "a", null))
+    data.+=((2, "b", null))
+    data.+=((3, "c", null))
+    data.+=((4, "d", null))
+    data.+=((5, null, null))
+    data.+=((6, "a", null))
+    data.+=((7, "b", null))
+    data.+=((8, "c", null))
+    data.+=((9, null, null))
+
+    val t = env.fromCollection(data).toTable(tEnv,'id, 'name, 'nullField, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery =
+      s"""
+         |SELECT T.aid, T.bNull, T.cid, T.aNull
+         |FROM MyTable
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    A.id AS aid,
+         |    A.nullField AS aNull,
+         |    LAST(B.nullField) AS bNull,
+         |    C.id AS cid
+         |  PATTERN (A B C)
+         |  DEFINE
+         |    A AS name = 'a' AND nullField IS NULL,
+         |    B AS name = 'b' AND LAST(A.nullField) IS NULL,
+         |    C AS name = 'c'
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("1,null,3,null", "6,null,8,null")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testCodeSplitsAreProperlyGenerated(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tableConfig = new TableConfig
+    tableConfig.setMaxGeneratedCodeLength(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env, tableConfig)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String, String, String)]
+    data.+=((1, "a", "key1", "second_key3"))
+    data.+=((2, "b", "key1", "second_key3"))
+    data.+=((3, "c", "key1", "second_key3"))
+    data.+=((4, "d", "key", "second_key"))
+    data.+=((5, "e", "key", "second_key"))
+    data.+=((6, "a", "key2", "second_key4"))
+    data.+=((7, "b", "key2", "second_key4"))
+    data.+=((8, "c", "key2", "second_key4"))
+    data.+=((9, "f", "key", "second_key"))
+
+    val t = env.fromCollection(data)
+      .toTable(tEnv, 'id, 'name, 'key1, 'key2, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM MyTable
+         |MATCH_RECOGNIZE (
+         |  PARTITION BY key1, key2
+         |  ORDER BY proctime
+         |  MEASURES
+         |    A.id AS aid,
+         |    A.key1 AS akey1,
+         |    LAST(B.id) AS bid,
+         |    C.id AS cid,
+         |    C.key2 AS ckey2
+         |  PATTERN (A B C)
+         |  DEFINE
+         |    A AS name = 'a' AND key1 LIKE '%key%' AND id > 0,
+         |    B AS name = 'b' AND LAST(A.name, 2) IS NULL,
+         |    C AS name = 'c' AND LAST(A.name) = 'a'
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "key1,second_key3,1,key1,2,3,second_key3",
+      "key2,second_key4,6,key2,7,8,second_key4")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventsAreProperlyOrdered(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = Seq(
+      Left(2L, (12, 1, "a", 1)),
+      Left(1L, (11, 2, "b", 2)),
+      Left(3L, (10, 3, "c", 3)), //event time order breaks this match
+      Right(3L),
+      Left(4L, (8, 4, "a", 4)),
+      Left(4L, (9, 5, "b", 5)),
+      Left(5L, (7, 6, "c", 6)), //secondary order breaks this match
+      Right(5L),
+      Left(6L, (6, 8, "a", 7)),
+      Left(6L, (6, 7, "b", 8)),
+      Left(8L, (4, 9, "c", 9)), //ternary order breaks this match
+      Right(8L),
+      Left(9L, (3, 10, "a", 10)),
+      Left(10L, (2, 11, "b", 11)),
+      Left(11L, (1, 12, "c", 12)),
+      Right(11L)
+    )
+
+    val t = env.addSource(new EventTimeSourceFunction[(Int, Int, String, Int)](data))
+      .toTable(tEnv, 'secondaryOrder, 'ternaryOrder, 'name, 'id,'tstamp.rowtime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery =
+      s"""
+         |SELECT T.aid, T.bid, T.cid
+         |FROM MyTable
+         |MATCH_RECOGNIZE (
+         |  ORDER BY tstamp, secondaryOrder DESC, ternaryOrder ASC
+         |  MEASURES
+         |    A.id AS aid,
+         |    B.id AS bid,
+         |    C.id AS cid
+         |  PATTERN (A B C)
+         |  DEFINE
+         |    A AS name = 'a',
+         |    B AS name = 'b',
+         |    C AS name = 'c'
+         |) AS T
+         |""".stripMargin
+
+    val table = tEnv.sqlQuery(sqlQuery)
+
+    val result = table.toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("10,11,12")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testMatchRecognizeAppliedToWindowedGrouping(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(String, Long, Int, Int)]
+    //first window
+    data.+=(("ACME", Time.seconds(1).toMilliseconds, 1, 1))
+    data.+=(("ACME", Time.seconds(2).toMilliseconds, 2, 2))
+    //second window
+    data.+=(("ACME", Time.seconds(4).toMilliseconds, 1, 4))
+    data.+=(("ACME", Time.seconds(5).toMilliseconds, 1, 3))
+    //third window
+    data.+=(("ACME", Time.seconds(7).toMilliseconds, 2, 3))
+    data.+=(("ACME", Time.seconds(8).toMilliseconds, 2, 3))
+
+    data.+=(("ACME1", Time.seconds(1).toMilliseconds, 20, 4))
+    data.+=(("ACME1", Time.seconds(1).toMilliseconds, 24, 4))
+    data.+=(("ACME1", Time.seconds(1).toMilliseconds, 25, 3))
+    data.+=(("ACME1", Time.seconds(1).toMilliseconds, 19, 8))
+
+    val t = env.fromCollection(data)
+      .assignAscendingTimestamps(e => e._2)
+      .toTable(tEnv, 'symbol, 'tstamp.rowtime, 'price, 'tax)
+    tEnv.registerTable("Ticker", t)
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM (
+         |   SELECT
+         |      symbol,
+         |      SUM(price) as price,
+         |      TUMBLE_ROWTIME(tstamp, interval '3' second) as rowTime,
+         |      TUMBLE_START(tstamp, interval '3' second) as startTime
+         |   FROM Ticker
+         |   GROUP BY symbol, TUMBLE(tstamp, interval '3' second)
+         |)
+         |MATCH_RECOGNIZE (
+         |  PARTITION BY symbol
+         |  ORDER BY rowTime
+         |  MEASURES
+         |    B.price as dPrice,
+         |    B.startTime as dTime
+         |  ONE ROW PER MATCH
+         |  PATTERN (A B)
+         |  DEFINE
+         |    B AS B.price < A.price
+         |)
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("ACME,2,1970-01-01 00:00:03.0")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testLogicalOffsets(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(String, Long, Int, Int)]
+    data.+=(("ACME", 1L, 19, 1))
+    data.+=(("ACME", 2L, 17, 2))
+    data.+=(("ACME", 3L, 13, 3))
+    data.+=(("ACME", 4L, 20, 4))
+    data.+=(("ACME", 5L, 20, 5))
+    data.+=(("ACME", 6L, 26, 6))
+    data.+=(("ACME", 7L, 20, 7))
+    data.+=(("ACME", 8L, 25, 8))
+
+    val t = env.fromCollection(data)
+      .toTable(tEnv, 'symbol, 'tstamp, 'price, 'tax, 'proctime.proctime)
+    tEnv.registerTable("Ticker", t)
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    FIRST(DOWN.tstamp) AS start_tstamp,
+         |    LAST(DOWN.tstamp) AS bottom_tstamp,
+         |    UP.tstamp AS end_tstamp,
+         |    FIRST(DOWN.price + DOWN.tax + 1) AS bottom_total,
+         |    UP.price + UP.tax AS end_total
+         |  ONE ROW PER MATCH
+         |  AFTER MATCH SKIP PAST LAST ROW
+         |  PATTERN (DOWN{2,} UP)
+         |  DEFINE
+         |    DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,
+         |    UP AS price < FIRST(DOWN.price)
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("6,7,8,33,33")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testLogicalOffsetsWithStarVariable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String, Long, Int)]
+    data.+=((1, "ACME", 1L, 20))
+    data.+=((2, "ACME", 2L, 19))
+    data.+=((3, "ACME", 3L, 18))
+    data.+=((4, "ACME", 4L, 17))
+    data.+=((5, "ACME", 5L, 16))
+    data.+=((6, "ACME", 6L, 15))
+    data.+=((7, "ACME", 7L, 14))
+    data.+=((8, "ACME", 8L, 20))
+
+    val t = env.fromCollection(data)
+      .toTable(tEnv, 'id, 'symbol, 'tstamp, 'price, 'proctime.proctime)
+    tEnv.registerTable("Ticker", t)
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    FIRST(id, 0) as id0,
+         |    FIRST(id, 1) as id1,
+         |    FIRST(id, 2) as id2,
+         |    FIRST(id, 3) as id3,
+         |    FIRST(id, 4) as id4,
+         |    FIRST(id, 5) as id5,
+         |    FIRST(id, 6) as id6,
+         |    FIRST(id, 7) as id7,
+         |    LAST(id, 0) as id8,
+         |    LAST(id, 1) as id9,
+         |    LAST(id, 2) as id10,
+         |    LAST(id, 3) as id11,
+         |    LAST(id, 4) as id12,
+         |    LAST(id, 5) as id13,
+         |    LAST(id, 6) as id14,
+         |    LAST(id, 7) as id15
+         |  ONE ROW PER MATCH
+         |  AFTER MATCH SKIP PAST LAST ROW
+         |  PATTERN (`DOWN"`{2,} UP)
+         |  DEFINE
+         |    `DOWN"` AS price < LAST(price, 1) OR LAST(price, 1) IS NULL,
+         |    UP AS price = FIRST(price) AND price > FIRST(price, 3) AND price = LAST(price, 7)
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("1,2,3,4,5,6,7,8,8,7,6,5,4,3,2,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testLogicalOffsetOutsideOfRangeInMeasures(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(String, Long, Int, Int)]
+    data.+=(("ACME", 1L, 19, 1))
+    data.+=(("ACME", 2L, 17, 2))
+    data.+=(("ACME", 3L, 13, 3))
+    data.+=(("ACME", 4L, 20, 4))
+
+    val t = env.fromCollection(data)
+      .toTable(tEnv, 'symbol, 'tstamp, 'price, 'tax, 'proctime.proctime)
+    tEnv.registerTable("Ticker", t)
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    FIRST(DOWN.price) as first,
+         |    LAST(DOWN.price) as last,
+         |    FIRST(DOWN.price, 5) as nullPrice
+         |  ONE ROW PER MATCH
+         |  AFTER MATCH SKIP PAST LAST ROW
+         |  PATTERN (DOWN{2,} UP)
+         |  DEFINE
+         |    DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,
+         |    UP AS price > LAST(DOWN.price)
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("19,13,null")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAccessingProctime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String)]
+    data.+=((1, "a"))
+
+    val t = env.fromCollection(data).toTable(tEnv,'id, 'name, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery =
+      s"""
+         |SELECT T.aid
+         |FROM MyTable
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    A.id AS aid,
+         |    A.proctime AS aProctime,
+         |    LAST(A.proctime + INTERVAL '1' second) as calculatedField
+         |  PATTERN (A)
+         |  DEFINE
+         |    A AS proctime >= (CURRENT_TIMESTAMP - INTERVAL '1' day)
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+
+    // We do not assert the proctime in the result, cause it is currently
+    // accessed from System.currentTimeMillis(), so there is no graceful way to assert the proctime
+  }
+
+  @Test
+  def testPartitioningByTimeIndicator(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String)]
+    data.+=((1, "a"))
+
+    val t = env.fromCollection(data).toTable(tEnv,'id, 'name, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+
+    val sqlQuery =
+      s"""
+         |SELECT T.aid
+         |FROM MyTable
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    A.id AS aid,
+         |    A.proctime AS aProctime,
+         |    LAST(A.proctime + INTERVAL '1' second) as calculatedField
+         |  PATTERN (A)
+         |  DEFINE
+         |    A AS proctime >= (CURRENT_TIMESTAMP - INTERVAL '1' day)
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List("1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+
+    // We do not assert the proctime in the result, cause it is currently
+    // accessed from System.currentTimeMillis(), so there is no graceful way to assert the proctime
+  }
+}
+
+class ToMillis extends ScalarFunction {
+  def eval(t: Timestamp): Long = {
+    t.toInstant.toEpochMilli + TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services