You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/16 08:01:09 UTC

[GitHub] dianfu commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE

dianfu commented on a change in pull request #6815:  [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225402418
 
 

 ##########
 File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
 ##########
 @@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
+import org.apache.flink.cep.nfa.compiler.NFACompiler
+import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
+import org.apache.flink.cep.pattern.conditions.BooleanConditions
+import org.apache.flink.cep.{CEP, EventComparator, PatternStream}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableConfig,
+  TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.MatchRecognize
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalMatch
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.runtime.`match`._
+import org.apache.flink.table.runtime.aggregate.SortUtil
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.{RowKeySelector, RowtimeProcessFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.MathUtils
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with LogicalMatch.
+  */
+class DataStreamMatch(
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  input: RelNode,
+  logicalMatch: FlinkLogicalMatch,
+  schema: RowSchema,
+  inputSchema: RowSchema)
+  extends SingleRel(cluster, traitSet, input)
+    with MatchRecognize
+    with DataStreamRel {
+
+  private[flink] def getLogicalMatch = logicalMatch
+
+  override def deriveRowType(): RelDataType = schema.relDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
+    new DataStreamMatch(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      logicalMatch,
+      schema,
+      inputSchema)
+  }
+
+  override def toString: String = {
+    matchToString(logicalMatch, inputSchema, getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    explainMatch(super.explainTerms(pw), logicalMatch, inputSchema, getExpressionString)
+  }
+
+  private def translateOrder(
+    tableEnv: StreamTableEnvironment,
+    crowInput: DataStream[CRow],
+    orderKeys: RelCollation) = {
+
+    if (orderKeys.getFieldCollations.size() == 0) {
+      throw new TableException("You must specify either rowtime or proctime for order by.")
+    }
+
+    // need to identify time between others order fields. Time needs to be first sort element
+    val timeOrderField = SortUtil.getFirstSortField(orderKeys, inputSchema.relDataType)
+
+    if (!FlinkTypeFactory.isTimeIndicatorType(timeOrderField.getType)) {
+      throw new TableException(
+        "You must specify either rowtime or proctime for order by as the first one.")
+    }
+
+    // time ordering needs to be ascending
+    if (SortUtil.getFirstSortDirection(orderKeys) != Direction.ASCENDING) {
+      throw new TableException("Primary sort order of a streaming table must be ascending on time.")
+    }
+
+    val rowComparator = if (orderKeys.getFieldCollations.size() > 1) {
+      Some(SortUtil
+        .createRowComparator(inputSchema.relDataType,
+          orderKeys.getFieldCollations.asScala.tail,
+          tableEnv.execEnv.getConfig))
+    } else {
+      None
+    }
+
+    timeOrderField.getType match {
+      case _ if FlinkTypeFactory.isRowtimeIndicatorType(timeOrderField.getType) =>
+        (crowInput.process(
+          new RowtimeProcessFunction(timeOrderField.getIndex, CRowTypeInfo(inputSchema.typeInfo))
+        ).setParallelism(crowInput.getParallelism),
+          rowComparator)
+      case _ =>
+        (crowInput, rowComparator)
+    }
+  }
+
+  private def applyPartitioning(partitionKeys: util.List[RexNode], inputDs: DataStream[Row]) = {
+    if (partitionKeys.size() > 0) {
+      val keys = partitionKeys.asScala.map {
+        case ref: RexInputRef => ref.getIndex
+      }.toArray
+      val keySelector = new RowKeySelector(keys, inputSchema.projectedTypeInfo(keys))
+      inputDs.keyBy(keySelector)
+    } else {
+      inputDs
+    }
+  }
+
+  override def translateToPlan(
+    tableEnv: StreamTableEnvironment,
+    queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+    val config = tableEnv.config
+    val inputTypeInfo = inputSchema.typeInfo
+
+    val crowInput: DataStream[CRow] = getInput
+      .asInstanceOf[DataStreamRel]
+      .translateToPlan(tableEnv, queryConfig)
+
+    val orderKeys = logicalMatch.getOrderKeys.getFieldCollations
+    val (timestampedInput, rowComparator) = translateOrder(tableEnv,
+      crowInput,
+      logicalMatch.getOrderKeys)
+
+    val cepPattern = logicalMatch.getPattern
+      .accept(new PatternVisitor(config, inputTypeInfo, logicalMatch))
+
+    //TODO remove this once it is supported in CEP library
+    if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
+      throw new TableException(
+        "Patterns that can produce empty matches are not supported. There must be at least one " +
+          "non-optional state.")
+    }
+
+    //TODO remove this once it is supported in CEP library
+    if (cepPattern.getQuantifier.hasProperty(QuantifierProperty.GREEDY)) {
+      throw new TableException(
+        "Greedy quantifiers are not allowed as the last element of a Pattern yet. Finish your " +
+          "pattern with either a simple variable or reluctant quantifier.")
+    }
+
+    if (logicalMatch.getInterval != null) {
+      throw new TableException(
+        "WITHIN clause is not part of the SQL Standard, thus it is not supported.")
 
 Review comment:
   I think we should discuss whether to support WITHIN clause. IMO, we should support it as many use cases require a time constraint. Without it, the supported use cases will be very limited.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services