You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by rtudoran <gi...@git.apache.org> on 2017/04/07 13:50:57 UTC

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

GitHub user rtudoran opened a pull request:

    https://github.com/apache/flink/pull/3700

    Backbone implementation for supporting sort. Implementation includes

    util functions, sort aggregations (without retraction), rule and sort
    cases identification
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/huawei-flink/flink FLINK-6081

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3700.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3700
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110476516
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.rel.logical.LogicalSort
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.functions.AggregateFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.table.api.TableException
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.flink.table.functions.Accumulator
    +import java.util.{ List => JList, ArrayList }
    +import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, TypeInformation }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.table.functions.aggfunctions.RowSortAggFunction
    +import java.sql.Timestamp
    +import org.apache.calcite.rel.RelFieldCollation
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +
    +/**
    + * Class represents a collection of helper methods to build the sort logic.
    + * It encapsulates as well implementation for ordering and generic interfaces
    + */
    +
    +object SortUtil {
    +
    +  /**
    +   * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
    +   * elements based on proctime and potentially other fields
    +   * @param calcSort Sort logical object
    +   * @param inputType input row type
    +   * @return org.apache.flink.streaming.api.functions.ProcessFunction
    +   */
    +  private[flink] def createProcTimeSortFunction(
    +    calcSort: LogicalSort,
    +    inputType: RelDataType): ProcessFunction[Row, Row] = {
    +
    +    val keySortFields = getSortFieldIndexList(calcSort)
    +    val keySortDirections = getSortFieldDirectionList(calcSort)
    +    val sortAggregates = createSortAggregation(inputType, keySortFields,keySortDirections, false)
    +
    +    val aggType = createSingleAccumulatorRowType(sortAggregates)
    +    
    +   new ProcTimeUnboundedSortProcessFunction(
    +      sortAggregates,
    +      inputType.getFieldCount,
    +      aggType)
    +
    +  }
    +
    +  
    +   /**
    +   * Function creates a sorting aggregation object 
    +   * elements based on proctime and potentially other fields
    +   * @param inputType input row type
    +   * @param keyIndex the indexes of the fields on which the sorting is done. 
    +   * @param keySortDirections the directions of the sorts for each field. 
    +   * First is expected to be the time  
    +   * @return SortAggregationFunction
    +   */
    +  private def createSortAggregation(
    +    inputType: RelDataType,
    +    keyIndex: Array[Int],
    +    keySortDirections: Array[Direction],
    +    retraction: Boolean): MultiOutputAggregateFunction[_] = {
    +
    +    val orderings = createOrderingComparison(inputType, keyIndex)
    +
    +    val sortKeyType = toKeySortInternalRowTypeInfo(inputType, keyIndex).asInstanceOf[RowTypeInfo]
    +
    +    // get the output types
    +    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
    +
    +    val sortAggFunc = new RowSortAggFunction(keyIndex,
    +        keySortDirections, orderings, rowTypeInfo, sortKeyType)
    +
    +    sortAggFunc
    +
    +  }
    +  
    +   /**
    +   * Function creates a typed based comparison objects 
    +   * @param inputType input row type
    +   * @param keyIndex the indexes of the fields on which the sorting is done. 
    +   * First is expected to be the time  
    +   * @return Array of ordering objects
    +   */
    +  
    +  def createOrderingComparison(inputType: RelDataType,
    +    keyIndex: Array[Int]): Array[UntypedOrdering] = {
    +
    +    var i = 0
    +    val orderings = new Array[UntypedOrdering](keyIndex.size)
    +
    +    while (i < keyIndex.size) {
    +      val sqlTypeName = inputType.getFieldList.get(keyIndex(i)).getType.getSqlTypeName
    +
    +      orderings(i) = sqlTypeName match {
    +        case TINYINT =>
    +          new ByteOrdering()
    +        case SMALLINT =>
    +          new SmallOrdering()
    +        case INTEGER =>
    +          new IntOrdering()
    +        case BIGINT =>
    +          new LongOrdering()
    +        case FLOAT =>
    +          new FloatOrdering()
    +        case DOUBLE =>
    +          new DoubleOrdering()
    +        case DECIMAL =>
    +          new DecimalOrdering()
    +        case VARCHAR | CHAR =>
    +          new StringOrdering()
    +        //should be updated when times are merged in master branch based on their types
    +        case TIMESTAMP =>
    +          new TimestampOrdering()
    +        case sqlType: SqlTypeName =>
    +          throw new TableException("Sort aggregate does no support type:" + sqlType)
    +      }
    +      i += 1
    +    }
    +    orderings
    +  }
    +  
    +  /**
    +   * Function creates a type for sort aggregation 
    +   * @param sort input row type
    +   * @param keyIndex the indexes of the fields on which the sorting is done. 
    +   * First is expected to be the time  
    +   * @return org.apache.flink.streaming.api.functions.ProcessFunction
    +   */
    +  private def createSingleAccumulatorRowType(
    +      sortAggregate: MultiOutputAggregateFunction[_]): RowTypeInfo = {
    +    
    +    val accType = sortAggregate.getAccumulatorType
    +    new RowTypeInfo(accType)
    +  }
    +
    +  /**
    +   * Extracts and converts a Calcite logical record into a Flink type information
    +   * by selecting certain only a subset of the fields
    +   */
    +  def toKeySortInternalRowTypeInfo(logicalRowType: RelDataType,
    +    keyIndexes: Array[Int]): TypeInformation[Row] = {
    +    var i = 0
    +    val fieldList = logicalRowType.getFieldList
    +    val logicalFieldTypes = new Array[TypeInformation[_]](keyIndexes.size)
    +    val logicalFieldNames = new Array[String](keyIndexes.size)
    +
    +    while (i < keyIndexes.size) {
    +      logicalFieldTypes(i) = (FlinkTypeFactory.toTypeInfo(
    +        logicalRowType.getFieldList.get(i).getType))
    +      logicalFieldNames(i) = (logicalRowType.getFieldNames.get(i))
    +      i += 1
    +    }
    +
    +    new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray)
    +
    +  }
    +
    +  /**
    +   * Function returns the array of indexes for the fields on which the sort is done
    +   * @param calcSort The LogicalSort object
    +   * @return [Array[Int]]
    +   */
    +  def getSortFieldIndexList(calcSort: LogicalSort): Array[Int] = {
    +    val keyFields = calcSort.collation.getFieldCollations
    +    var i = 0
    +    val keySort = new Array[Int](keyFields.size())
    +    while (i < keyFields.size()) {
    +      keySort(i) = keyFields.get(i).getFieldIndex
    +      i += 1
    +    }
    +    keySort
    +  }
    +  
    +    /**
    +   * Function returns the array of sort direction for the sort fields 
    +   * @param calcSort The LogicalSort object
    +   * @return [Array[Int]]
    +   */
    +  def getSortFieldDirectionList(calcSort: LogicalSort): Array[Direction] = {
    +    val keyFields = calcSort.collation.getFieldCollations
    +    var i = 0
    +    val keySortDirection = new Array[Direction](keyFields.size())
    +    while (i < keyFields.size()) {
    +      keySortDirection(i) = getDirection(calcSort,i)
    +      i += 1
    +    }
    +    keySortDirection
    +  }
    +
    +  /**
    +   * Function returns the time type in order clause. Expectation is that if exists if is the
    +   * primary sort field
    +   * @param calcSort The LogicalSort object
    +   * @param rowType The data type of the input
    +   * @return [Array[Int]]
    +   */
    +  def getTimeType(calcSort: LogicalSort, rowType: RelDataType): RelDataType = {
    +
    +    //need to identify time between others order fields
    +    //
    +    val ind = calcSort.getCollationList.get(0).getFieldCollations.get(0).getFieldIndex
    +    rowType.getFieldList.get(ind).getValue
    +  }
    +
    +  /**
    +   * Function returns the direction type of the time in order clause. 
    +   * @param calcSort The LogicalSort object
    +   * @return [Array[Int]]
    +   */
    +  def getTimeDirection(calcSort: LogicalSort):Direction = {
    +    calcSort.getCollationList.get(0).getFieldCollations.get(0).direction
    +  }
    +  
    +   /**
    +   * Function returns the direction type of the field in order clause. 
    +   * @param calcSort The LogicalSort object
    +   * @return [Array[Int]]
    +   */
    +  def getDirection(calcSort: LogicalSort, sortField:Int):Direction = {
    +    
    +    calcSort.getCollationList.get(0).getFieldCollations.get(sortField).direction match {
    +      case Direction.ASCENDING => Direction.ASCENDING
    +      case Direction.DESCENDING => Direction.DESCENDING
    +      case _ =>  throw new TableException("SQL/Table does not support such sorting")
    +    }
    +    
    +  }
    +  
    +}
    +
    +
    +/**
    + * Untyped interface for defining comparison method that can be override by typed implementations
    + * Each typed implementation will cast the generic type to the implicit ordering type used 
    + */
    +
    +trait UntypedOrdering extends Serializable{
    +  def compare(x: Any, y: Any): Int
    +
    +}
    +
    +class LongOrdering(implicit ord: Ordering[Long]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xL = x.asInstanceOf[Long]
    +    val yL = y.asInstanceOf[Long]
    +    ord.compare(xL, yL)
    +  }
    +}
    +
    +class IntOrdering(implicit ord: Ordering[Int]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xI = x.asInstanceOf[Int]
    +    val yI = y.asInstanceOf[Int]
    +    ord.compare(xI, yI)
    +  }
    +}
    +
    +class FloatOrdering(implicit ord: Ordering[Float]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xF = x.asInstanceOf[Float]
    +    val yF = y.asInstanceOf[Float]
    +    ord.compare(xF, yF)
    +  }
    +}
    +
    +class DoubleOrdering(implicit ord: Ordering[Double]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xD = x.asInstanceOf[Double]
    +    val yD = y.asInstanceOf[Double]
    +    ord.compare(xD, yD)
    +  }
    +}
    +
    +class DecimalOrdering(implicit ord: Ordering[BigDecimal]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xBD = x.asInstanceOf[BigDecimal]
    +    val yBD = y.asInstanceOf[BigDecimal]
    +    ord.compare(xBD, yBD)
    +  }
    +}
    +
    +class ByteOrdering(implicit ord: Ordering[Byte]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xB = x.asInstanceOf[Byte]
    +    val yB = y.asInstanceOf[Byte]
    +    ord.compare(xB, yB)
    +  }
    +}
    +
    +class SmallOrdering(implicit ord: Ordering[Short]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xS = x.asInstanceOf[Short]
    +    val yS = y.asInstanceOf[Short]
    +    ord.compare(xS, yS)
    +  }
    +}
    +
    +class StringOrdering(implicit ord: Ordering[String]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xS = x.asInstanceOf[String]
    +    val yS = y.asInstanceOf[String]
    +    ord.compare(xS, yS)
    +  }
    +}
    +
    +/**
    + * Ordering object for timestamps. As there is no implicit Ordering for [java.sql.Timestamp]
    + * we need to compare based on the Long value of the timestamp
    + */
    +class TimestampOrdering(implicit ord: Ordering[Long]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xTs = x.asInstanceOf[Timestamp]
    +    val yTs = y.asInstanceOf[Timestamp]
    +    ord.compare(xTs.getTime, yTs.getTime)
    +  }
    +}
    +
    +
    +
    +/**
    + * Called every time when a multi-output aggregation result should be materialized.
    + * The returned values could be either an early and incomplete result
    + * (periodically emitted as data arrive) or the final result of the
    + * aggregation.
    + *
    + * @param accumulator the accumulator which contains the current
    + *                    aggregated results
    + * @return the aggregation result
    + */
    +abstract class MultiOutputAggregateFunction[T] extends AggregateFunction[T] {
    --- End diff --
    
    An AggregateFunction should only return a single value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110479361
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
    +import org.apache.flink.api.java.tuple.Tuple
    +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream }
    +import org.apache.flink.streaming.api.windowing.assigners._
    +import org.apache.flink.streaming.api.windowing.time.Time
    +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow }
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.plan.logical._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil._
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
    +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo }
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.logical.LogicalSort
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
    +import org.apache.flink.table.api.TableException
    +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
    +import org.apache.flink.api.common.functions.RichMapFunction
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.table.functions.ProcTimeType
    +import org.apache.flink.table.functions.RowTimeType
    +import org.apache.calcite.rel.core.Sort
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +
    +/**
    +  * Flink RelNode which matches along with Sort Rule.
    +  *
    +  */
    +class DataStreamSort(
    +  calc: LogicalSort,
    +  cluster: RelOptCluster,
    +  traitSet: RelTraitSet,
    +  inputNode: RelNode,
    +  rowRelDataType: RelDataType,
    +  inputType: RelDataType,
    +  description: String)
    +    extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamSort(
    +      calc,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      rowRelDataType,
    +      inputType,
    +      description + calc.getId())
    +  }
    +
    +  override def toString: String = {
    +    s"Sort($calc)"+
    +      s"on fields: (${calc.collation.getFieldCollations})"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw)
    +      .item("aggregate", calc)
    +      .item("sort fields",calc.collation.getFieldCollations)
    +      .itemIf("offset", calc.offset, calc.offset!=null)
    +      .itemIf("fetch", calc.fetch, calc.fetch!=null)
    +      .item("input", inputNode)
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +    
    +    //need to identify time between others order fields. Time needs to be first sort element
    +    val timeType = SortUtil.getTimeType(calc,inputType)
    +    
    +    //time ordering needs to be ascending
    +    if(SortUtil.getTimeDirection(calc)!=Direction.ASCENDING) {
    +      throw new TableException("SQL/Table supports only ascending time ordering")
    +    }
    +      
    +     
    +    val (offset,fetch) = (calc.offset,calc.fetch)
    +    
    +    //enable to extend for other types of aggregates that will not be implemented in a window
    +    timeType match {
    +        case _: ProcTimeType =>
    +            (offset,fetch) match {
    +              case (o:Any,f:Any)  => null             // offset and fetch needs retraction
    +              case (_,f:Any) => null                  // offset needs retraction
    +              case (o:Any,_) => null                  // fetch needs retraction
    +              case _ => createSortProcTime(inputDS)   //sort can be done with/without retraction
    +            }
    +        case _: RowTimeType =>
    +          throw new TableException("SQL/Table does not support sort on row time")
    +        case _ =>
    +          throw new TableException("SQL/Table needs to have sort on time as first sort element")
    +    }
    +    
    +  }
    +
    +  /**
    +   * Create Sort logic based on processing time
    +   */
    +  def createSortProcTime(
    +    inputDS: DataStream[Row]): DataStream[Row] = {
    +
    +    /*
    +     * [TODO] consider the partition case - In this implementation even if the stream is
    +     * pre-partitioned all elements will be reshufled and sorted
    +     */
    +
    +        
    +     // get the output types
    +    //Sort does not do project.= Hence it will output also the ordering proctime field
    +    //[TODO]Do we need to drop the ordering fields?
    +    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
    +    
    +    val processFunction = SortUtil.createProcTimeSortFunction(calc,inputType)
    +      
    +     val result: DataStream[Row] = inputDS
    +            .keyBy(new NullByteKeySelector[Row])
    --- End diff --
    
    I think in order to sort the stream we have to bring it to a single task. Whether we use a `NullByteKeySelector` or an operator with parallelism 1 doesn't make a big difference performance-wise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110481685
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
    +import org.apache.flink.api.java.tuple.Tuple
    +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream }
    +import org.apache.flink.streaming.api.windowing.assigners._
    +import org.apache.flink.streaming.api.windowing.time.Time
    +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow }
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.plan.logical._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil._
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
    +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo }
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.logical.LogicalSort
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
    +import org.apache.flink.table.api.TableException
    +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
    +import org.apache.flink.api.common.functions.RichMapFunction
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.table.functions.ProcTimeType
    +import org.apache.flink.table.functions.RowTimeType
    +import org.apache.calcite.rel.core.Sort
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +
    +/**
    +  * Flink RelNode which matches along with Sort Rule.
    +  *
    +  */
    +class DataStreamSort(
    +  calc: LogicalSort,
    +  cluster: RelOptCluster,
    +  traitSet: RelTraitSet,
    +  inputNode: RelNode,
    +  rowRelDataType: RelDataType,
    +  inputType: RelDataType,
    +  description: String)
    +    extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamSort(
    +      calc,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      rowRelDataType,
    +      inputType,
    +      description + calc.getId())
    +  }
    +
    +  override def toString: String = {
    +    s"Sort($calc)"+
    +      s"on fields: (${calc.collation.getFieldCollations})"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw)
    +      .item("aggregate", calc)
    +      .item("sort fields",calc.collation.getFieldCollations)
    +      .itemIf("offset", calc.offset, calc.offset!=null)
    +      .itemIf("fetch", calc.fetch, calc.fetch!=null)
    +      .item("input", inputNode)
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +    
    +    //need to identify time between others order fields. Time needs to be first sort element
    +    val timeType = SortUtil.getTimeType(calc,inputType)
    +    
    +    //time ordering needs to be ascending
    +    if(SortUtil.getTimeDirection(calc)!=Direction.ASCENDING) {
    +      throw new TableException("SQL/Table supports only ascending time ordering")
    +    }
    +      
    +     
    +    val (offset,fetch) = (calc.offset,calc.fetch)
    +    
    +    //enable to extend for other types of aggregates that will not be implemented in a window
    +    timeType match {
    +        case _: ProcTimeType =>
    +            (offset,fetch) match {
    +              case (o:Any,f:Any)  => null             // offset and fetch needs retraction
    +              case (_,f:Any) => null                  // offset needs retraction
    +              case (o:Any,_) => null                  // fetch needs retraction
    +              case _ => createSortProcTime(inputDS)   //sort can be done with/without retraction
    --- End diff --
    
    If there is no secondary field, we can use a simple forwarding MapFunction with parallelism 1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3700: Backbone implementation for supporting sort. Implementati...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/3700
  
    @fhueske This PR can be closed. I have addressed all remarks and reimplemented everything in #3714


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3700: Backbone implementation for supporting sort. Implementati...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/3700
  
    @fhueske  I will close the PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r111073880
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedSortProcessFunction.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{ ArrayList, LinkedList, List => JList }
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
    +
    +/**
    + * Process Function used for the aggregate in bounded proc-time OVER window
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param aggregates the [[org.apache.flink.table.functions.aggfunctions.SortAggFunction]]
    + *                   used for this sort aggregation
    + * @param fieldCount Is used to indicate fields in the current element to forward
    + * @param aggType It is used to mark the Aggregate type
    + */
    +class ProcTimeUnboundedSortProcessFunction(
    +  private val aggregates: MultiOutputAggregateFunction[_],
    --- End diff --
    
    @fhueske regarding the typecomparator...they will still need to rely on the previous implementation that casts the data to specific types. Otherwise we cannot typecast directly from the object that we get when calling the row.getField to the specific type needed by a typecomparator


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110477811
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
    +import org.apache.flink.api.java.tuple.Tuple
    +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream }
    +import org.apache.flink.streaming.api.windowing.assigners._
    +import org.apache.flink.streaming.api.windowing.time.Time
    +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow }
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.plan.logical._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil._
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
    +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo }
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.logical.LogicalSort
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
    +import org.apache.flink.table.api.TableException
    +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
    +import org.apache.flink.api.common.functions.RichMapFunction
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.table.functions.ProcTimeType
    +import org.apache.flink.table.functions.RowTimeType
    +import org.apache.calcite.rel.core.Sort
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +
    +/**
    +  * Flink RelNode which matches along with Sort Rule.
    +  *
    +  */
    +class DataStreamSort(
    +  calc: LogicalSort,
    --- End diff --
    
    `calc` -> `sort`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110901541
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
    +import org.apache.flink.api.java.tuple.Tuple
    +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream }
    +import org.apache.flink.streaming.api.windowing.assigners._
    +import org.apache.flink.streaming.api.windowing.time.Time
    +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow }
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.plan.logical._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil._
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
    +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo }
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.logical.LogicalSort
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
    +import org.apache.flink.table.api.TableException
    +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
    +import org.apache.flink.api.common.functions.RichMapFunction
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.table.functions.ProcTimeType
    +import org.apache.flink.table.functions.RowTimeType
    +import org.apache.calcite.rel.core.Sort
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +
    +/**
    +  * Flink RelNode which matches along with Sort Rule.
    +  *
    +  */
    +class DataStreamSort(
    +  calc: LogicalSort,
    +  cluster: RelOptCluster,
    +  traitSet: RelTraitSet,
    +  inputNode: RelNode,
    +  rowRelDataType: RelDataType,
    +  inputType: RelDataType,
    +  description: String)
    +    extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamSort(
    +      calc,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      rowRelDataType,
    +      inputType,
    +      description + calc.getId())
    +  }
    +
    +  override def toString: String = {
    +    s"Sort($calc)"+
    +      s"on fields: (${calc.collation.getFieldCollations})"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw)
    +      .item("aggregate", calc)
    +      .item("sort fields",calc.collation.getFieldCollations)
    +      .itemIf("offset", calc.offset, calc.offset!=null)
    +      .itemIf("fetch", calc.fetch, calc.fetch!=null)
    +      .item("input", inputNode)
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +    
    +    //need to identify time between others order fields. Time needs to be first sort element
    +    val timeType = SortUtil.getTimeType(calc,inputType)
    +    
    +    //time ordering needs to be ascending
    +    if(SortUtil.getTimeDirection(calc)!=Direction.ASCENDING) {
    +      throw new TableException("SQL/Table supports only ascending time ordering")
    +    }
    +      
    +     
    +    val (offset,fetch) = (calc.offset,calc.fetch)
    +    
    +    //enable to extend for other types of aggregates that will not be implemented in a window
    +    timeType match {
    +        case _: ProcTimeType =>
    +            (offset,fetch) match {
    +              case (o:Any,f:Any)  => null             // offset and fetch needs retraction
    +              case (_,f:Any) => null                  // offset needs retraction
    +              case (o:Any,_) => null                  // fetch needs retraction
    +              case _ => createSortProcTime(inputDS)   //sort can be done with/without retraction
    --- End diff --
    
    you mean specific for the order on processTime. Indeed I will check this case. Anyway for the first commit i want to target the case when there is actually some sorting - just to have the implementation also for the processFunction as a model


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3700: Backbone implementation for supporting sort. Implementati...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/3700
  
    @fhueske @shijinkui @hongyuhong @stefanobortoli @sunjincheng121 @twalthr 
    
    I have created a first implementation for supporting logical sort. As per the discussion on the JIRA issue the intention is to merge in the master the support for sorting based on proctime and event time (potentially with additional fields) in append only mode. The other cases (having offset, fetch..) will be merged in the retraction branch. The backbone for the 2 is the same and this implementation contains it.
    Please review it and let me know if you have particular comments mainly about the structure and the idea on how to approach it.
    As reference the key point is to support this via aggregates. Therefor we have a sort aggregate where we accumulate rows which are internally sorted and maintained in a sorted list based on the order given by the keys. This requires to have a single extension for retrieving the results in the form of a getValues instead of getValue.
    
    A second remark is about the usage of processFunction (with the advantage that values are accumulated/sorted as they come) - which by default requires partition. However, if there is a prepartition of the stream beforehand this would reshufle all elements. If this case is possible i will drop the partitionBy before the function (i hope this does not break the usage of ValueState) - i will add a specific comment inline
    
    For the tests - please ignore them as instead of integration tests i will use the harness framework


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110481364
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedSortProcessFunction.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{ ArrayList, LinkedList, List => JList }
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
    +
    +/**
    + * Process Function used for the aggregate in bounded proc-time OVER window
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param aggregates the [[org.apache.flink.table.functions.aggfunctions.SortAggFunction]]
    + *                   used for this sort aggregation
    + * @param fieldCount Is used to indicate fields in the current element to forward
    + * @param aggType It is used to mark the Aggregate type
    + */
    +class ProcTimeUnboundedSortProcessFunction(
    +  private val aggregates: MultiOutputAggregateFunction[_],
    --- End diff --
    
    I don't think the sorting should be done with an aggregation function. An aggregation function returns a single value and is IMO not the right tool to sort data. 
    
    I would make the design similar to the one you have, but instead of using an accumulator, you can use a `ListState` to append new records. When the timer fires, you collect all data for the millisecond, and sort with Java's regular collections sort with a `Comparator<Row>` that wraps `TypeComparators` for the secondary sort fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110900641
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SortAggFunction.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.math.BigDecimal
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import scala.collection.mutable.ArrayBuffer
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.runtime.aggregate.UntypedOrdering
    +import org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
    +import java.util.{ List => JList,ArrayList }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +/** The initial accumulator for Sort aggregate function */
    +class SortAccumulator extends ArrayBuffer[JTuple2[Row,Row]] with Accumulator with Serializable
    +
    +/**
    +  * Base class for built-in Min aggregate function
    +  *
    +  * @tparam K the type for the key sort type
    +  * @tparam T the type for the aggregation result
    +  */
    +abstract class SortAggFunction[K,T](
    --- End diff --
    
    @fhueske 
    Both are fine - i do not think there is one particular simpler than the other. The reason i chose the aggregate-based implementation was based on the initial discussions to treat this as an aggregation as well. However, i can make another version in which i would move this in the processFunction.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110396436
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
    +import org.apache.flink.api.java.tuple.Tuple
    +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream }
    +import org.apache.flink.streaming.api.windowing.assigners._
    +import org.apache.flink.streaming.api.windowing.time.Time
    +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow }
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.plan.logical._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil._
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
    +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo }
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.logical.LogicalSort
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
    +import org.apache.flink.table.api.TableException
    +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
    +import org.apache.flink.api.common.functions.RichMapFunction
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.table.functions.ProcTimeType
    +import org.apache.flink.table.functions.RowTimeType
    +import org.apache.calcite.rel.core.Sort
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +
    +/**
    +  * Flink RelNode which matches along with Sort Rule.
    +  *
    +  */
    +class DataStreamSort(
    +  calc: LogicalSort,
    +  cluster: RelOptCluster,
    +  traitSet: RelTraitSet,
    +  inputNode: RelNode,
    +  rowRelDataType: RelDataType,
    +  inputType: RelDataType,
    +  description: String)
    +    extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamSort(
    +      calc,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      rowRelDataType,
    +      inputType,
    +      description + calc.getId())
    +  }
    +
    +  override def toString: String = {
    +    s"Sort($calc)"+
    +      s"on fields: (${calc.collation.getFieldCollations})"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw)
    +      .item("aggregate", calc)
    +      .item("sort fields",calc.collation.getFieldCollations)
    +      .itemIf("offset", calc.offset, calc.offset!=null)
    +      .itemIf("fetch", calc.fetch, calc.fetch!=null)
    +      .item("input", inputNode)
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +    
    +    //need to identify time between others order fields. Time needs to be first sort element
    +    val timeType = SortUtil.getTimeType(calc,inputType)
    +    
    +    //time ordering needs to be ascending
    +    if(SortUtil.getTimeDirection(calc)!=Direction.ASCENDING) {
    +      throw new TableException("SQL/Table supports only ascending time ordering")
    +    }
    +      
    +     
    +    val (offset,fetch) = (calc.offset,calc.fetch)
    +    
    +    //enable to extend for other types of aggregates that will not be implemented in a window
    +    timeType match {
    +        case _: ProcTimeType =>
    +            (offset,fetch) match {
    +              case (o:Any,f:Any)  => null             // offset and fetch needs retraction
    +              case (_,f:Any) => null                  // offset needs retraction
    +              case (o:Any,_) => null                  // fetch needs retraction
    +              case _ => createSortProcTime(inputDS)   //sort can be done with/without retraction
    +            }
    +        case _: RowTimeType =>
    +          throw new TableException("SQL/Table does not support sort on row time")
    +        case _ =>
    +          throw new TableException("SQL/Table needs to have sort on time as first sort element")
    +    }
    +    
    +  }
    +
    +  /**
    +   * Create Sort logic based on processing time
    +   */
    +  def createSortProcTime(
    +    inputDS: DataStream[Row]): DataStream[Row] = {
    +
    +    /*
    +     * [TODO] consider the partition case - In this implementation even if the stream is
    +     * pre-partitioned all elements will be reshufled and sorted
    +     */
    +
    +        
    +     // get the output types
    +    //Sort does not do project.= Hence it will output also the ordering proctime field
    +    //[TODO]Do we need to drop the ordering fields?
    +    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
    +    
    +    val processFunction = SortUtil.createProcTimeSortFunction(calc,inputType)
    +      
    +     val result: DataStream[Row] = inputDS
    +            .keyBy(new NullByteKeySelector[Row])
    --- End diff --
    
    @fhueske 
    Having a null partition for the process function can break previous partitions perhaps and reshuffle all elements again into one function. Perhaps we need to drop this - what do you think?
    Also...we need to check that even without partition functions we can still use ValueState (i think it is possible as i believe the MapState had a pre-requisite to have partitioned functions) is it so?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110902190
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedSortProcessFunction.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{ ArrayList, LinkedList, List => JList }
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
    +
    +/**
    + * Process Function used for the aggregate in bounded proc-time OVER window
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param aggregates the [[org.apache.flink.table.functions.aggfunctions.SortAggFunction]]
    + *                   used for this sort aggregation
    + * @param fieldCount Is used to indicate fields in the current element to forward
    + * @param aggType It is used to mark the Aggregate type
    + */
    +class ProcTimeUnboundedSortProcessFunction(
    +  private val aggregates: MultiOutputAggregateFunction[_],
    --- End diff --
    
    @fhueske Ok i will move the implementation into a processFunction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110482967
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedSortProcessFunction.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{ ArrayList, LinkedList, List => JList }
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
    +
    +/**
    + * Process Function used for the aggregate in bounded proc-time OVER window
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param aggregates the [[org.apache.flink.table.functions.aggfunctions.SortAggFunction]]
    + *                   used for this sort aggregation
    + * @param fieldCount Is used to indicate fields in the current element to forward
    + * @param aggType It is used to mark the Aggregate type
    + */
    +class ProcTimeUnboundedSortProcessFunction(
    +  private val aggregates: MultiOutputAggregateFunction[_],
    --- End diff --
    
    You can even construct a RowComparator which compares rows based on field comparators. Since we are comparing object references, we do not need the serializers here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110480674
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedSortProcessFunction.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{ ArrayList, LinkedList, List => JList }
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
    +
    +/**
    + * Process Function used for the aggregate in bounded proc-time OVER window
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param aggregates the [[org.apache.flink.table.functions.aggfunctions.SortAggFunction]]
    + *                   used for this sort aggregation
    + * @param fieldCount Is used to indicate fields in the current element to forward
    + * @param aggType It is used to mark the Aggregate type
    + */
    +class ProcTimeUnboundedSortProcessFunction(
    +  private val aggregates: MultiOutputAggregateFunction[_],
    +  private val fieldCount: Int,
    +  private val aggType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +
    +  private var output: Row = _
    +  private var accumulatorState: ValueState[Row] = _
    +
    +  override def open(config: Configuration) {
    +    output = new Row(fieldCount)
    +
    +    val stateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("sortState", aggType)
    +    accumulatorState = getRuntimeContext.getState(stateDescriptor)
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    //buffer the event incoming event
    +  
    +    //initialize the accumulators 
    +    var accumulators = accumulatorState.value()
    +    if (null == accumulators) {
    +      accumulators = new Row(1)
    +      accumulators.setField(0, aggregates.createAccumulator)
    +    }
    +    
    +    //we aggregate(sort) the events as they arrive. However, this works only
    +    //if the onTimer is called before the processElement which should be the case
    +    val accumulator = accumulators.getField(0).asInstanceOf[Accumulator]
    +    aggregates.accumulate(accumulator, input)    
    +    accumulatorState.update(accumulators)
    +    
    +    //deduplication of multiple registered timers is done automatically
    +    ctx.timerService.registerProcessingTimeTimer(currentTime + 1)  
    +    
    +  }
    +  
    +  override def onTimer(
    +    timestamp: Long,
    +    ctx: ProcessFunction[Row, Row]#OnTimerContext,
    +    out: Collector[Row]): Unit = {
    +    
    +    var i = 0
    +
    +    //initialize the accumulators 
    +    var accumulators = accumulatorState.value()
    +    if (null == accumulators) {
    +      accumulators = new Row(1)
    +      accumulators.setField(0, aggregates.createAccumulator)
    +    }
    +
    +    val accumulator = accumulators.getField(0).asInstanceOf[Accumulator]
    +    
    +    //no retraction now
    +    //...
    +        
    +    //get the list of elements of current proctime
    +    var sortedValues = aggregates.getValues(accumulator)
    +    
    +    
    +    //we need to build the output and emit for every event received at this proctime
    +    var iElemenets = 0
    +    while (iElemenets < sortedValues.size) {
    +      val input = sortedValues.get(iElemenets).asInstanceOf[Row]
    +      
    +      //set the fields of the last event to carry on with the aggregates
    +      i = 0
    +      while (i < fieldCount) {
    --- End diff --
    
    This is a 1-to-1 copy right? We can simply emit the `input` record if that's the case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110477127
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.volcano.RelSubset
    +import org.apache.calcite.plan.{ Convention, RelOptRule, RelOptRuleCall, RelTraitSet }
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.calcite.rel.logical.{ LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan }
    +import org.apache.calcite.rex.RexNode
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
    +import org.apache.calcite.rel.logical.LogicalSort
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamSort
    +
    +/**
    + * Rule to convert a LogicalSort into a DataStreamSort.
    + */
    +class DataStreamSortRule
    +    extends ConverterRule(
    +      classOf[LogicalSort],
    +      Convention.NONE,
    +      DataStreamConvention.INSTANCE,
    +      "DataStreamSortRule") {
    +
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    super.matches(call)
    +  }
    +
    +  override def convert(rel: RelNode): RelNode = {
    +    val calc: LogicalSort = rel.asInstanceOf[LogicalSort]
    --- End diff --
    
    `calc` -> `sort`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110476920
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SortAggFunction.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.math.BigDecimal
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import scala.collection.mutable.ArrayBuffer
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.runtime.aggregate.UntypedOrdering
    +import org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
    +import java.util.{ List => JList,ArrayList }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +/** The initial accumulator for Sort aggregate function */
    +class SortAccumulator extends ArrayBuffer[JTuple2[Row,Row]] with Accumulator with Serializable
    +
    +/**
    +  * Base class for built-in Min aggregate function
    +  *
    +  * @tparam K the type for the key sort type
    +  * @tparam T the type for the aggregation result
    +  */
    +abstract class SortAggFunction[K,T](
    --- End diff --
    
    I don't think we should implement sorting with an aggregation function. IMO, it would be much easier to do this directly in a `ProcessFunction`. The sort could be done with Java's collection sort and a `Comparator<Row>` that wraps `TypeComparator`s for the secondary sorting fields obtained from `TypeInformation`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3700: Backbone implementation for supporting sort. Implementati...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/3700
  
    I just realized that the commit does not have the proper name/number



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110478103
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala ---
    @@ -0,0 +1,167 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{ RelNode, RelWriter, SingleRel }
    +import org.apache.flink.api.java.tuple.Tuple
    +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream }
    +import org.apache.flink.streaming.api.windowing.assigners._
    +import org.apache.flink.streaming.api.windowing.time.Time
    +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow }
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.plan.logical._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil._
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
    +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo }
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.logical.LogicalSort
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
    +import org.apache.flink.table.api.TableException
    +import org.apache.calcite.sql.fun.SqlSingleValueAggFunction
    +import org.apache.flink.api.common.functions.RichMapFunction
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.table.functions.ProcTimeType
    +import org.apache.flink.table.functions.RowTimeType
    +import org.apache.calcite.rel.core.Sort
    +import org.apache.flink.api.java.functions.NullByteKeySelector
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +
    +/**
    +  * Flink RelNode which matches along with Sort Rule.
    +  *
    +  */
    +class DataStreamSort(
    +  calc: LogicalSort,
    +  cluster: RelOptCluster,
    +  traitSet: RelTraitSet,
    +  inputNode: RelNode,
    +  rowRelDataType: RelDataType,
    +  inputType: RelDataType,
    +  description: String)
    +    extends SingleRel(cluster, traitSet, inputNode) with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataStreamSort(
    +      calc,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      rowRelDataType,
    +      inputType,
    +      description + calc.getId())
    +  }
    +
    +  override def toString: String = {
    +    s"Sort($calc)"+
    +      s"on fields: (${calc.collation.getFieldCollations})"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw)
    +      .item("aggregate", calc)
    +      .item("sort fields",calc.collation.getFieldCollations)
    +      .itemIf("offset", calc.offset, calc.offset!=null)
    +      .itemIf("fetch", calc.fetch, calc.fetch!=null)
    +      .item("input", inputNode)
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +    
    +    //need to identify time between others order fields. Time needs to be first sort element
    +    val timeType = SortUtil.getTimeType(calc,inputType)
    +    
    +    //time ordering needs to be ascending
    +    if(SortUtil.getTimeDirection(calc)!=Direction.ASCENDING) {
    --- End diff --
    
    +spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110480378
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedSortProcessFunction.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{ ArrayList, LinkedList, List => JList }
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
    +
    +/**
    + * Process Function used for the aggregate in bounded proc-time OVER window
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param aggregates the [[org.apache.flink.table.functions.aggfunctions.SortAggFunction]]
    + *                   used for this sort aggregation
    + * @param fieldCount Is used to indicate fields in the current element to forward
    + * @param aggType It is used to mark the Aggregate type
    + */
    +class ProcTimeUnboundedSortProcessFunction(
    +  private val aggregates: MultiOutputAggregateFunction[_],
    +  private val fieldCount: Int,
    +  private val aggType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +
    +  private var output: Row = _
    +  private var accumulatorState: ValueState[Row] = _
    +
    +  override def open(config: Configuration) {
    +    output = new Row(fieldCount)
    +
    +    val stateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("sortState", aggType)
    +    accumulatorState = getRuntimeContext.getState(stateDescriptor)
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    //buffer the event incoming event
    +  
    +    //initialize the accumulators 
    +    var accumulators = accumulatorState.value()
    +    if (null == accumulators) {
    +      accumulators = new Row(1)
    +      accumulators.setField(0, aggregates.createAccumulator)
    +    }
    +    
    +    //we aggregate(sort) the events as they arrive. However, this works only
    +    //if the onTimer is called before the processElement which should be the case
    +    val accumulator = accumulators.getField(0).asInstanceOf[Accumulator]
    +    aggregates.accumulate(accumulator, input)    
    +    accumulatorState.update(accumulators)
    +    
    +    //deduplication of multiple registered timers is done automatically
    +    ctx.timerService.registerProcessingTimeTimer(currentTime + 1)  
    +    
    +  }
    +  
    +  override def onTimer(
    +    timestamp: Long,
    +    ctx: ProcessFunction[Row, Row]#OnTimerContext,
    +    out: Collector[Row]): Unit = {
    +    
    +    var i = 0
    +
    +    //initialize the accumulators 
    +    var accumulators = accumulatorState.value()
    +    if (null == accumulators) {
    --- End diff --
    
    should already be set. `onTimer` will not be called if no timer was registered. This happens in `processElement` and means that a record was added to the accumulator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110483259
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SortAggFunction.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.math.BigDecimal
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import scala.collection.mutable.ArrayBuffer
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.runtime.aggregate.UntypedOrdering
    +import org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
    +import java.util.{ List => JList,ArrayList }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +/** The initial accumulator for Sort aggregate function */
    +class SortAccumulator extends ArrayBuffer[JTuple2[Row,Row]] with Accumulator with Serializable
    +
    +/**
    +  * Base class for built-in Min aggregate function
    +  *
    +  * @tparam K the type for the key sort type
    +  * @tparam T the type for the aggregation result
    +  */
    +abstract class SortAggFunction[K,T](
    +    val keyIndexes: Array[Int],
    +    val keySortDirections: Array[Direction],
    +    val orderings: Array[UntypedOrdering]) extends MultiOutputAggregateFunction[T] {
    +
    +  override def createAccumulator(): Accumulator = {
    +    val acc = new SortAccumulator
    +      
    +    acc
    +  }
    +
    +  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[Row]
    +      val acc = accumulator.asInstanceOf[SortAccumulator]
    +      
    +      var i = 0
    +      //create the (compose) key of the new value
    +      val keyV = new Row(keyIndexes.size)
    +      while (i<keyIndexes.size) {
    +        keyV.setField(i, v.getField(keyIndexes(i)))
    +        i += 1
    +      }
    +        
    +      var j = 0
    +      while (j<acc.size) {
    +        i = 0
    +        while (i<keyIndexes.size) {
    +          
    +          val compareResult = if(keySortDirections(i) == Direction.ASCENDING) {
    +            orderings(i).compare(acc(j).f0.getField(i),keyV.getField(i))
    +          } else {
    +            orderings(i).compare(keyV.getField(i),acc(j).f0.getField(i))
    +          }
    +          
    +          compareResult match {
    +          case 0 => i += 1 //same key and need to sort on consequent keys 
    +          case g if g > 0 => {
    +            acc.insert(j, new JTuple2(keyV,v)) //add new element in place
    --- End diff --
    
    Insertion in an `ArrayBuffer` should be expensive, because all following entries need to be moved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110479650
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.rel.logical.LogicalSort
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.functions.AggregateFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.table.api.TableException
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.flink.table.functions.Accumulator
    +import java.util.{ List => JList, ArrayList }
    +import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, TypeInformation }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.table.functions.aggfunctions.RowSortAggFunction
    +import java.sql.Timestamp
    +import org.apache.calcite.rel.RelFieldCollation
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +
    +/**
    + * Class represents a collection of helper methods to build the sort logic.
    + * It encapsulates as well implementation for ordering and generic interfaces
    + */
    +
    +object SortUtil {
    +
    +  /**
    +   * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
    +   * elements based on proctime and potentially other fields
    +   * @param calcSort Sort logical object
    +   * @param inputType input row type
    +   * @return org.apache.flink.streaming.api.functions.ProcessFunction
    +   */
    +  private[flink] def createProcTimeSortFunction(
    +    calcSort: LogicalSort,
    +    inputType: RelDataType): ProcessFunction[Row, Row] = {
    +
    +    val keySortFields = getSortFieldIndexList(calcSort)
    +    val keySortDirections = getSortFieldDirectionList(calcSort)
    +    val sortAggregates = createSortAggregation(inputType, keySortFields,keySortDirections, false)
    +
    +    val aggType = createSingleAccumulatorRowType(sortAggregates)
    +    
    +   new ProcTimeUnboundedSortProcessFunction(
    +      sortAggregates,
    +      inputType.getFieldCount,
    +      aggType)
    +
    +  }
    +
    +  
    +   /**
    +   * Function creates a sorting aggregation object 
    +   * elements based on proctime and potentially other fields
    +   * @param inputType input row type
    +   * @param keyIndex the indexes of the fields on which the sorting is done. 
    +   * @param keySortDirections the directions of the sorts for each field. 
    +   * First is expected to be the time  
    +   * @return SortAggregationFunction
    +   */
    +  private def createSortAggregation(
    +    inputType: RelDataType,
    +    keyIndex: Array[Int],
    +    keySortDirections: Array[Direction],
    +    retraction: Boolean): MultiOutputAggregateFunction[_] = {
    +
    +    val orderings = createOrderingComparison(inputType, keyIndex)
    --- End diff --
    
    We can generate `TypeComparator` from `TypeInformation`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110900086
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.rel.logical.LogicalSort
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.functions.AggregateFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.table.api.TableException
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.flink.table.functions.Accumulator
    +import java.util.{ List => JList, ArrayList }
    +import org.apache.flink.api.common.typeinfo.{ SqlTimeTypeInfo, TypeInformation }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.table.functions.aggfunctions.RowSortAggFunction
    +import java.sql.Timestamp
    +import org.apache.calcite.rel.RelFieldCollation
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +
    +/**
    + * Class represents a collection of helper methods to build the sort logic.
    + * It encapsulates as well implementation for ordering and generic interfaces
    + */
    +
    +object SortUtil {
    +
    +  /**
    +   * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting 
    +   * elements based on proctime and potentially other fields
    +   * @param calcSort Sort logical object
    +   * @param inputType input row type
    +   * @return org.apache.flink.streaming.api.functions.ProcessFunction
    +   */
    +  private[flink] def createProcTimeSortFunction(
    +    calcSort: LogicalSort,
    +    inputType: RelDataType): ProcessFunction[Row, Row] = {
    +
    +    val keySortFields = getSortFieldIndexList(calcSort)
    +    val keySortDirections = getSortFieldDirectionList(calcSort)
    +    val sortAggregates = createSortAggregation(inputType, keySortFields,keySortDirections, false)
    +
    +    val aggType = createSingleAccumulatorRowType(sortAggregates)
    +    
    +   new ProcTimeUnboundedSortProcessFunction(
    +      sortAggregates,
    +      inputType.getFieldCount,
    +      aggType)
    +
    +  }
    +
    +  
    +   /**
    +   * Function creates a sorting aggregation object 
    +   * elements based on proctime and potentially other fields
    +   * @param inputType input row type
    +   * @param keyIndex the indexes of the fields on which the sorting is done. 
    +   * @param keySortDirections the directions of the sorts for each field. 
    +   * First is expected to be the time  
    +   * @return SortAggregationFunction
    +   */
    +  private def createSortAggregation(
    +    inputType: RelDataType,
    +    keyIndex: Array[Int],
    +    keySortDirections: Array[Direction],
    +    retraction: Boolean): MultiOutputAggregateFunction[_] = {
    +
    +    val orderings = createOrderingComparison(inputType, keyIndex)
    +
    +    val sortKeyType = toKeySortInternalRowTypeInfo(inputType, keyIndex).asInstanceOf[RowTypeInfo]
    +
    +    // get the output types
    +    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
    +
    +    val sortAggFunc = new RowSortAggFunction(keyIndex,
    +        keySortDirections, orderings, rowTypeInfo, sortKeyType)
    +
    +    sortAggFunc
    +
    +  }
    +  
    +   /**
    +   * Function creates a typed based comparison objects 
    +   * @param inputType input row type
    +   * @param keyIndex the indexes of the fields on which the sorting is done. 
    +   * First is expected to be the time  
    +   * @return Array of ordering objects
    +   */
    +  
    +  def createOrderingComparison(inputType: RelDataType,
    +    keyIndex: Array[Int]): Array[UntypedOrdering] = {
    +
    +    var i = 0
    +    val orderings = new Array[UntypedOrdering](keyIndex.size)
    +
    +    while (i < keyIndex.size) {
    +      val sqlTypeName = inputType.getFieldList.get(keyIndex(i)).getType.getSqlTypeName
    +
    +      orderings(i) = sqlTypeName match {
    +        case TINYINT =>
    +          new ByteOrdering()
    +        case SMALLINT =>
    +          new SmallOrdering()
    +        case INTEGER =>
    +          new IntOrdering()
    +        case BIGINT =>
    +          new LongOrdering()
    +        case FLOAT =>
    +          new FloatOrdering()
    +        case DOUBLE =>
    +          new DoubleOrdering()
    +        case DECIMAL =>
    +          new DecimalOrdering()
    +        case VARCHAR | CHAR =>
    +          new StringOrdering()
    +        //should be updated when times are merged in master branch based on their types
    +        case TIMESTAMP =>
    +          new TimestampOrdering()
    +        case sqlType: SqlTypeName =>
    +          throw new TableException("Sort aggregate does no support type:" + sqlType)
    +      }
    +      i += 1
    +    }
    +    orderings
    +  }
    +  
    +  /**
    +   * Function creates a type for sort aggregation 
    +   * @param sort input row type
    +   * @param keyIndex the indexes of the fields on which the sorting is done. 
    +   * First is expected to be the time  
    +   * @return org.apache.flink.streaming.api.functions.ProcessFunction
    +   */
    +  private def createSingleAccumulatorRowType(
    +      sortAggregate: MultiOutputAggregateFunction[_]): RowTypeInfo = {
    +    
    +    val accType = sortAggregate.getAccumulatorType
    +    new RowTypeInfo(accType)
    +  }
    +
    +  /**
    +   * Extracts and converts a Calcite logical record into a Flink type information
    +   * by selecting certain only a subset of the fields
    +   */
    +  def toKeySortInternalRowTypeInfo(logicalRowType: RelDataType,
    +    keyIndexes: Array[Int]): TypeInformation[Row] = {
    +    var i = 0
    +    val fieldList = logicalRowType.getFieldList
    +    val logicalFieldTypes = new Array[TypeInformation[_]](keyIndexes.size)
    +    val logicalFieldNames = new Array[String](keyIndexes.size)
    +
    +    while (i < keyIndexes.size) {
    +      logicalFieldTypes(i) = (FlinkTypeFactory.toTypeInfo(
    +        logicalRowType.getFieldList.get(i).getType))
    +      logicalFieldNames(i) = (logicalRowType.getFieldNames.get(i))
    +      i += 1
    +    }
    +
    +    new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray)
    +
    +  }
    +
    +  /**
    +   * Function returns the array of indexes for the fields on which the sort is done
    +   * @param calcSort The LogicalSort object
    +   * @return [Array[Int]]
    +   */
    +  def getSortFieldIndexList(calcSort: LogicalSort): Array[Int] = {
    +    val keyFields = calcSort.collation.getFieldCollations
    +    var i = 0
    +    val keySort = new Array[Int](keyFields.size())
    +    while (i < keyFields.size()) {
    +      keySort(i) = keyFields.get(i).getFieldIndex
    +      i += 1
    +    }
    +    keySort
    +  }
    +  
    +    /**
    +   * Function returns the array of sort direction for the sort fields 
    +   * @param calcSort The LogicalSort object
    +   * @return [Array[Int]]
    +   */
    +  def getSortFieldDirectionList(calcSort: LogicalSort): Array[Direction] = {
    +    val keyFields = calcSort.collation.getFieldCollations
    +    var i = 0
    +    val keySortDirection = new Array[Direction](keyFields.size())
    +    while (i < keyFields.size()) {
    +      keySortDirection(i) = getDirection(calcSort,i)
    +      i += 1
    +    }
    +    keySortDirection
    +  }
    +
    +  /**
    +   * Function returns the time type in order clause. Expectation is that if exists if is the
    +   * primary sort field
    +   * @param calcSort The LogicalSort object
    +   * @param rowType The data type of the input
    +   * @return [Array[Int]]
    +   */
    +  def getTimeType(calcSort: LogicalSort, rowType: RelDataType): RelDataType = {
    +
    +    //need to identify time between others order fields
    +    //
    +    val ind = calcSort.getCollationList.get(0).getFieldCollations.get(0).getFieldIndex
    +    rowType.getFieldList.get(ind).getValue
    +  }
    +
    +  /**
    +   * Function returns the direction type of the time in order clause. 
    +   * @param calcSort The LogicalSort object
    +   * @return [Array[Int]]
    +   */
    +  def getTimeDirection(calcSort: LogicalSort):Direction = {
    +    calcSort.getCollationList.get(0).getFieldCollations.get(0).direction
    +  }
    +  
    +   /**
    +   * Function returns the direction type of the field in order clause. 
    +   * @param calcSort The LogicalSort object
    +   * @return [Array[Int]]
    +   */
    +  def getDirection(calcSort: LogicalSort, sortField:Int):Direction = {
    +    
    +    calcSort.getCollationList.get(0).getFieldCollations.get(sortField).direction match {
    +      case Direction.ASCENDING => Direction.ASCENDING
    +      case Direction.DESCENDING => Direction.DESCENDING
    +      case _ =>  throw new TableException("SQL/Table does not support such sorting")
    +    }
    +    
    +  }
    +  
    +}
    +
    +
    +/**
    + * Untyped interface for defining comparison method that can be override by typed implementations
    + * Each typed implementation will cast the generic type to the implicit ordering type used 
    + */
    +
    +trait UntypedOrdering extends Serializable{
    +  def compare(x: Any, y: Any): Int
    +
    +}
    +
    +class LongOrdering(implicit ord: Ordering[Long]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xL = x.asInstanceOf[Long]
    +    val yL = y.asInstanceOf[Long]
    +    ord.compare(xL, yL)
    +  }
    +}
    +
    +class IntOrdering(implicit ord: Ordering[Int]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xI = x.asInstanceOf[Int]
    +    val yI = y.asInstanceOf[Int]
    +    ord.compare(xI, yI)
    +  }
    +}
    +
    +class FloatOrdering(implicit ord: Ordering[Float]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xF = x.asInstanceOf[Float]
    +    val yF = y.asInstanceOf[Float]
    +    ord.compare(xF, yF)
    +  }
    +}
    +
    +class DoubleOrdering(implicit ord: Ordering[Double]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xD = x.asInstanceOf[Double]
    +    val yD = y.asInstanceOf[Double]
    +    ord.compare(xD, yD)
    +  }
    +}
    +
    +class DecimalOrdering(implicit ord: Ordering[BigDecimal]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xBD = x.asInstanceOf[BigDecimal]
    +    val yBD = y.asInstanceOf[BigDecimal]
    +    ord.compare(xBD, yBD)
    +  }
    +}
    +
    +class ByteOrdering(implicit ord: Ordering[Byte]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xB = x.asInstanceOf[Byte]
    +    val yB = y.asInstanceOf[Byte]
    +    ord.compare(xB, yB)
    +  }
    +}
    +
    +class SmallOrdering(implicit ord: Ordering[Short]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xS = x.asInstanceOf[Short]
    +    val yS = y.asInstanceOf[Short]
    +    ord.compare(xS, yS)
    +  }
    +}
    +
    +class StringOrdering(implicit ord: Ordering[String]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xS = x.asInstanceOf[String]
    +    val yS = y.asInstanceOf[String]
    +    ord.compare(xS, yS)
    +  }
    +}
    +
    +/**
    + * Ordering object for timestamps. As there is no implicit Ordering for [java.sql.Timestamp]
    + * we need to compare based on the Long value of the timestamp
    + */
    +class TimestampOrdering(implicit ord: Ordering[Long]) extends UntypedOrdering {
    +
    +  override def compare(x: Any, y: Any): Int = {
    +    val xTs = x.asInstanceOf[Timestamp]
    +    val yTs = y.asInstanceOf[Timestamp]
    +    ord.compare(xTs.getTime, yTs.getTime)
    +  }
    +}
    +
    +
    +
    +/**
    + * Called every time when a multi-output aggregation result should be materialized.
    + * The returned values could be either an early and incomplete result
    + * (periodically emitted as data arrive) or the final result of the
    + * aggregation.
    + *
    + * @param accumulator the accumulator which contains the current
    + *                    aggregated results
    + * @return the aggregation result
    + */
    +abstract class MultiOutputAggregateFunction[T] extends AggregateFunction[T] {
    --- End diff --
    
    I think this is debatable...i can see also aggregates that would just have more values (e.g., aggregate last 10 values).
    Nevertheless, this is irrelevant if we move to the implementation in the processing function


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran closed the pull request at:

    https://github.com/apache/flink/pull/3700


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3700: Backbone implementation for supporting sort. Implementati...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3700
  
    Hi @rtudoran, thanks for the new PR. I'll try to have a look at #3714 in the next days.
    Can you close this PR yourself? We can not directly close it but have to add the ID to the message of a commit.
    
    Thank you,
    Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3700: Backbone implementation for supporting sort. Imple...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3700#discussion_r110900961
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SortAggFunction.scala ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.math.BigDecimal
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import scala.collection.mutable.ArrayBuffer
    +import org.apache.flink.types.Row
    +import org.apache.flink.table.runtime.aggregate.UntypedOrdering
    +import org.apache.flink.table.runtime.aggregate.MultiOutputAggregateFunction
    +import java.util.{ List => JList,ArrayList }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import org.apache.calcite.rel.RelFieldCollation.Direction
    +
    +/** The initial accumulator for Sort aggregate function */
    +class SortAccumulator extends ArrayBuffer[JTuple2[Row,Row]] with Accumulator with Serializable
    +
    +/**
    +  * Base class for built-in Min aggregate function
    +  *
    +  * @tparam K the type for the key sort type
    +  * @tparam T the type for the aggregation result
    +  */
    +abstract class SortAggFunction[K,T](
    +    val keyIndexes: Array[Int],
    +    val keySortDirections: Array[Direction],
    +    val orderings: Array[UntypedOrdering]) extends MultiOutputAggregateFunction[T] {
    +
    +  override def createAccumulator(): Accumulator = {
    +    val acc = new SortAccumulator
    +      
    +    acc
    +  }
    +
    +  override def accumulate(accumulator: Accumulator, value: Any): Unit = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[Row]
    +      val acc = accumulator.asInstanceOf[SortAccumulator]
    +      
    +      var i = 0
    +      //create the (compose) key of the new value
    +      val keyV = new Row(keyIndexes.size)
    +      while (i<keyIndexes.size) {
    +        keyV.setField(i, v.getField(keyIndexes(i)))
    +        i += 1
    +      }
    +        
    +      var j = 0
    +      while (j<acc.size) {
    +        i = 0
    +        while (i<keyIndexes.size) {
    +          
    +          val compareResult = if(keySortDirections(i) == Direction.ASCENDING) {
    +            orderings(i).compare(acc(j).f0.getField(i),keyV.getField(i))
    +          } else {
    +            orderings(i).compare(keyV.getField(i),acc(j).f0.getField(i))
    +          }
    +          
    +          compareResult match {
    +          case 0 => i += 1 //same key and need to sort on consequent keys 
    +          case g if g > 0 => {
    +            acc.insert(j, new JTuple2(keyV,v)) //add new element in place
    --- End diff --
    
    True! i would change the data structures 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---