You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sunjincheng121 <gi...@git.apache.org> on 2017/02/23 08:24:05 UTC

[GitHub] flink pull request #3397: FLINK-5803-Add [partitioned] processing time OVER ...

GitHub user sunjincheng121 opened a pull request:

    https://github.com/apache/flink/pull/3397

    FLINK-5803-Add [partitioned] processing time OVER RANGE BETWEEN UNBOU\u2026

    \u2026NDED PRECEDING aggregation to SQL
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [�] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [�] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sunjincheng121/flink FLINK-5803-PR

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3397.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3397
    
----
commit 3464cc69cad5ec563878c8a6ea24ee449a0ce4f0
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Date:   2017-02-15T10:16:06Z

    FLINK-5803-Add [partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104446109
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -93,6 +94,41 @@ object AggregateUtil {
       }
     
       /**
    +    * Create an [[RichProcessFunction]] to evaluate final aggregate value.
    +    *
    +    * @param namedAggregates List of calls to aggregate functions and their output field names
    +    * @param inputType Input row type
    +    * @param outputType Output row type
    +    * @param forwardedFields All the forwarded fields
    +    * @return [[UnboundedProcessingOverProcessFunction]]
    +    */
    +  private[flink] def CreateUnboundedProcessingOverProcessFunction(
    +    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +    inputType: RelDataType,
    +    outputType: RelDataType,
    +    forwardedFields: Array[Int]): UnboundedProcessingOverProcessFunction = {
    +
    +    val (aggFields, aggregates) =
    +      transformToAggregateFunctions(
    +        namedAggregates.map(_.getKey),
    +        inputType,
    +        forwardedFields.length)
    +
    +    val rowTypeInfo = new RowTypeInfo(outputType.getFieldList
    +      .map(field => FlinkTypeFactory.toTypeInfo(field.getType)): _*)
    +
    +    val intermediateRowType: RowTypeInfo =
    +      createDataSetAggregateBufferDataType(forwardedFields, aggregates, inputType)
    --- End diff --
    
    This will actually cause a serialization error because the state type in `UnboundedProcessingOverProcessFunction` does not match the type info. The reason why the tests pass is that the default state backend keeps all data on the heap and does not serialize. We need to extend the test to use the RocksDBStatebackend to capture these cases. This is the first operator (besides the built-in windows which are tested in the DataStream API) which uses state.
    
    I suggest the following. We add a `StreamingWithStateTestBase` to the Table API utils which is defined as:
    ```
    class StreamingWithStateTestBase extends StreamingMultipleProgramsTestBase {
    
      val _tempFolder = new TemporaryFolder
    
      @Rule
      def tempFolder: TemporaryFolder = _tempFolder
    
      def getStateBackend: RocksDBStateBackend = {
        val dbPath = tempFolder.newFolder().getAbsolutePath
        val checkpointPath = tempFolder.newFolder().toURI.toString
        val backend = new RocksDBStateBackend(checkpointPath)
        backend.setDbStoragePath(dbPath)
        backend
      }
    }
    ```
    
    In tests which require state, we extend the ITCase class from `StreamingWithStateTestBase` instead of `StreamingMultipleProgramsTestBase` and set the state backend in each relevant method as `env.setStateBackend(getStateBackend)`. The test will then use RocksDBStateBackend and check for proper serialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103507771
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.java.typeutils.runtime.RowSerializer
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val projectionsMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val  intermediateRowType: RowTypeInfo,
    +  @transient private val returnType: TypeInformation[Row])
    --- End diff --
    
    indention


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103581600
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.java.typeutils.runtime.RowSerializer
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val projectionsMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val  intermediateRowType: RowTypeInfo,
    +  @transient private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  protected var stateSerializer: TypeSerializer[Row] = _
    +  protected var stateDescriptor: ValueStateDescriptor[Row] = _
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(projectionsMapping)
    +    Preconditions.checkNotNull(aggregateMapping)
    +    Preconditions.checkArgument(aggregates.length == aggregateMapping.length)
    +
    +    val finalRowLength: Int = projectionsMapping.length + aggregateMapping.length
    +    output = new Row(finalRowLength)
    +    stateSerializer = intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    stateDescriptor = new ValueStateDescriptor[Row]("overState", stateSerializer)
    --- End diff --
    
    Yes, I want do that.  but I get some suggestion from javaDoc:
    ```
    @deprecated Use {@link #ValueStateDescriptor(String, TypeSerializer)} instead and manually
    manage the default value by checking whether the contents of the state is {@code null}.
    ```
    Do we still use this constructor \uff1f


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104426368
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFields: Array[Int],
    +    private val forwardedFieldCount: Int,
    +    private val intermediateRowType: RowTypeInfo,
    --- End diff --
    
    `aggregationStateType`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104424825
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +      .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +      .item("overWindow", overWindow)
    +      .item(
    +        "select", aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq()))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    if (overWindow.orderKeys.getFieldCollations.size() != 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be ordered on a time mode column.")
    +    }
    +
    +    val timeType = inputType
    +      .getFieldList
    +      .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
    +      .getValue
    +
    +    timeType match {
    +      case _: ProcTimeType =>
    +        // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
    +        if (overWindow.lowerBound.isUnbounded &&
    +          overWindow.upperBound.isCurrentRow) {
    +          createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
    +        } else {
    +          throw new TableException(
    +              "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
    +              "condition.")
    +        }
    +      case _: RowTimeType =>
    +        throw new TableException("OVER Window of the EventTime type is not currently supported.")
    +      case _ =>
    +        throw new TableException(s"Unsupported time type {$timeType}")
    +    }
    +
    +  }
    +
    +  def createUnboundedAndCurrentRowProcessingTimeOverWindow(
    +    inputDS: DataStream[Row]): DataStream[Row]  = {
    +
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    val inputIndices = (0 until inputType.getFieldCount).toArray
    +
    +    // get the output types
    +    val fieldTypes: Array[TypeInformation[_]] = getRowType
    +      .getFieldList.asScala
    +      .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
    +
    +    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
    +
    +    val aggString = aggregationToString(
    +      inputType,
    +      inputIndices,
    +      getRowType,
    +      namedAggregates,
    +      Seq())
    +
    +    val keyedAggOpName = s"partitionBy: (${groupingToString(inputType, partitionKeys)}), " +
    +      s"overWindow: ($overWindow), " +
    +      s"select: ($aggString)"
    +
    +    val result: DataStream[Row] =
    +        // partitioned aggregation
    +        if (partitionKeys.length > 0) {
    --- End diff --
    
    `partitionKeys.nonEmpty`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104456607
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    --- End diff --
    
    `explainTerms` is used in our tests which are based on `TableTestBase`. The formatting should be aligned with the other operators. I suggest the following format:
    
    ```
    DataStreamOverAggregate(partitionBy=[c], orderBy=[b], range=[BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c AS a, COUNT(a) AS c, $SUM0(a) AS $2])
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3397


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104448367
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +      .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +      .item("overWindow", overWindow)
    +      .item(
    +        "select", aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq()))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    if (overWindow.orderKeys.getFieldCollations.size() != 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be ordered on a time mode column.")
    +    }
    +
    +    val timeType = inputType
    +      .getFieldList
    +      .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
    +      .getValue
    +
    +    timeType match {
    +      case _: ProcTimeType =>
    +        // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
    +        if (overWindow.lowerBound.isUnbounded &&
    +          overWindow.upperBound.isCurrentRow) {
    +          createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
    +        } else {
    +          throw new TableException(
    +              "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
    +              "condition.")
    +        }
    +      case _: RowTimeType =>
    +        throw new TableException("OVER Window of the EventTime type is not currently supported.")
    +      case _ =>
    +        throw new TableException(s"Unsupported time type {$timeType}")
    +    }
    +
    +  }
    +
    +  def createUnboundedAndCurrentRowProcessingTimeOverWindow(
    +    inputDS: DataStream[Row]): DataStream[Row]  = {
    +
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    val inputIndices = (0 until inputType.getFieldCount).toArray
    +
    +    // get the output types
    +    val fieldTypes: Array[TypeInformation[_]] = getRowType
    +      .getFieldList.asScala
    +      .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
    +
    +    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
    +
    +    val aggString = aggregationToString(
    +      inputType,
    +      inputIndices,
    +      getRowType,
    +      namedAggregates,
    +      Seq())
    +
    +    val keyedAggOpName = s"partitionBy: (${groupingToString(inputType, partitionKeys)}), " +
    --- End diff --
    
    This string contains duplicate information (PartitionBy and the Aggregates).
    How about a naming pattern like this:
    `"over: (PARTITION BY: x,y,z, ORDER BY: t, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), select: (a, b, c, COUNT(a) AS w0$o0))"`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102988435
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.volcano.RelSubset
    +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.calcite.rel.logical.LogicalWindow
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Rule to convert a LogicalWindow into a DataStreamOverAggregate.
    +  */
    +class DataStreamOverAggregateRule
    +  extends ConverterRule(
    +    classOf[LogicalWindow],
    +    Convention.NONE,
    +    DataStreamConvention.INSTANCE,
    +    "DataStreamOverAggregateRule") {
    +
    +  override def convert(rel: RelNode): RelNode = {
    +    val logicWindow: LogicalWindow = rel.asInstanceOf[LogicalWindow]
    +    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
    +    val convertInput: RelNode =
    +      RelOptRule.convert(logicWindow.getInput, DataStreamConvention.INSTANCE)
    +
    +    val inputRowType = convertInput.asInstanceOf[RelSubset].getOriginal.getRowType
    +
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      Seq[NamedWindowProperty](),
    --- End diff --
    
    May we not need this param. Because if we need we can carrying it by logicWindow or in projection. right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102952622
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---
    @@ -171,4 +183,79 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
         val expected = mutable.MutableList("Hello", "Hello world")
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
    +
    +  @Test
    +  def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    //TODO ORDER BY will be fixed, once FLINK-5710 is solved
    +    tEnv.registerFunction("ProcTime", ProcTime)
    +    val sqlQuery = "SELECT c, count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE " +
    --- End diff --
    
    This query returns the same result (see ORDER BY):
    ```
    "SELECT " +
    "  c, " +
    "  COUNT(a) OVER (PARTITION BY c ORDER BY b RANGE UNBOUNDED PRECEDING) AS cnt1, " +
    "  COUNT(a) OVER (PARTITION BY c ORDER BY b RANGE UNBOUNDED PRECEDING) AS cnt2 " +
    "FROM T1"
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102937926
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---
    @@ -171,4 +183,79 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
         val expected = mutable.MutableList("Hello", "Hello world")
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
    +
    +  @Test
    +  def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    //TODO ORDER BY will be fixed, once FLINK-5710 is solved
    +    tEnv.registerFunction("ProcTime", ProcTime)
    +    val sqlQuery = "SELECT c, count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE " +
    +      "UNBOUNDED " +
    +      "preceding) as firstCount, count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED" +
    +      " preceding) as secondCount from T1"
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "Hello World,1,1", "Hello World,2,2", "Hello World,3,3",
    +      "Hello,1,1", "Hello,2,2", "Hello,3,3", "Hello,4,4", "Hello,5,5", "Hello,6,6")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testUnboundPartitionedProcessingWindowWithRow(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT c, count(a) OVER (PARTITION BY c ROWS BETWEEN UNBOUNDED preceding " +
    +      "AND CURRENT ROW) as firstCount from T1"
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "Hello World,1", "Hello World,2", "Hello World,3",
    +      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test(expected = classOf[UnsupportedOperationException])
    --- End diff --
    
    Please add a comment why this test is expected to fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104424978
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +      .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    --- End diff --
    
    `partitionKeys.nonEmpty`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104586950
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    --- End diff --
    
    1. We never need check `overWindow.lowerBound != null and overWindow.upperBound != null`, right?
    2. You mean `DataStreamOverAggregate(partitionBy=[c], orderBy=[PROCTIME], range=[RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, PROCTIME, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1])` ? do we need show "PROCTIME" field ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104572313
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFields: Array[Int],
    +    private val forwardedFieldCount: Int,
    +    private val intermediateRowType: RowTypeInfo,
    +    private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex
    +
    +  override def open(config: Configuration) {
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    val stateSerializer: TypeSerializer[Row] =
    +      intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    val stateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("overState", stateSerializer)
    --- End diff --
    
    Can you explain why we do not use stateSerializer ? I am a bit confused about that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103508003
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.java.typeutils.runtime.RowSerializer
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val projectionsMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val  intermediateRowType: RowTypeInfo,
    +  @transient private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  protected var stateSerializer: TypeSerializer[Row] = _
    +  protected var stateDescriptor: ValueStateDescriptor[Row] = _
    --- End diff --
    
    `stateSerializer` and `stateDescriptor` do not need to be a member variables. They are only used in `open()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103502677
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTime, RowTimeType, TimeModeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +      .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +      .item("overWindow", overWindow)
    +      .item(
    +        "select", aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq()))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    if (overWindow.orderKeys.getFieldCollations.size() !=1){
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be ordered on a time mode column.")
    +    }
    +
    +    val timeMode = inputType
    +      .getFieldList
    +      .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
    +      .getValue
    +
    +    timeMode match {
    +      case _: ProcTimeType =>
    +        // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
    +        if (overWindow.lowerBound.isUnbounded &&
    +          overWindow.upperBound.isCurrentRow) {
    +          createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
    +        } else {
    +          throw new TableException(
    +            "Unsupported use of OVER windows." +
    --- End diff --
    
    improve error message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103510912
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.java.typeutils.runtime.RowSerializer
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val projectionsMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val  intermediateRowType: RowTypeInfo,
    +  @transient private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  protected var stateSerializer: TypeSerializer[Row] = _
    +  protected var stateDescriptor: ValueStateDescriptor[Row] = _
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(projectionsMapping)
    +    Preconditions.checkNotNull(aggregateMapping)
    +    Preconditions.checkArgument(aggregates.length == aggregateMapping.length)
    +
    +    val finalRowLength: Int = projectionsMapping.length + aggregateMapping.length
    +    output = new Row(finalRowLength)
    +    stateSerializer = intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    stateDescriptor = new ValueStateDescriptor[Row]("overState", stateSerializer)
    +    state = getRuntimeContext.getState(stateDescriptor)
    +  }
    +
    +  override def processElement(
    +    value2: Row,
    +    ctx: Context,
    +    out: Collector[Row]): Unit = {
    +    val value1 = state.value()
    +    val accumulatorRow = new Row(intermediateRowType.getArity)
    --- End diff --
    
    do we need `accumulatorRow`? 
    Can't we merge into `value1`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104449558
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    --- End diff --
    
    String contains duplicate information. Formatting of `overWindow.toString()` is not aligned with the toString formats of the other operators.
    
    What do you think this format:
    `OverAggregate(over: (PARTITION BY: x,y,z, ORDER BY: t, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), select: (a, b, c, COUNT(a) AS w0$o0)))`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104611978
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    --- End diff --
    
    Cool, I suggest to use `PROCTIME`, Because  `PROCTIME` better than `$2`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103502749
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTime, RowTimeType, TimeModeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +      .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +      .item("overWindow", overWindow)
    +      .item(
    +        "select", aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq()))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    if (overWindow.orderKeys.getFieldCollations.size() !=1){
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be ordered on a time mode column.")
    +    }
    +
    +    val timeMode = inputType
    +      .getFieldList
    +      .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
    +      .getValue
    +
    +    timeMode match {
    +      case _: ProcTimeType =>
    +        // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
    +        if (overWindow.lowerBound.isUnbounded &&
    +          overWindow.upperBound.isCurrentRow) {
    +          createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
    +        } else {
    +          throw new TableException(
    +            "Unsupported use of OVER windows." +
    +              "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
    +              "condition.")
    +        }
    +      case _: RowTimeType =>
    +        throw new TableException("Row-Time OVER window not supported.")
    +      case _ =>
    +        throw new TableException("Unsupported TimeMode.")
    --- End diff --
    
    Improve error message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104611482
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    --- End diff --
    
    That would be good as well. Logically, `procTime` is assigned in a Calc preceding the Window operator and has a alias assigned which is used in the order by clause of the window. So we could also show the alias instead of`PROCTIME` but both is fine with me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104628710
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -206,5 +164,36 @@ class DataStreamOverAggregate(
           yield new CalcitePair[AggregateCall, String](aggregateCalls.get(i), "w0$o" + i)
         (overWindow, partitionKeys, namedAggregates)
       }
    +
    +  private def aggOpName = {
    +    val (
    +      overWindow: Group,
    +      partition: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"over: (${
    +      if (!partition.isEmpty) {
    +        s"PARTITION BY: ${partitionToString(inputType, partition)}, "
    +      } else {
    +        ""
    +      }
    +    }ORDER BY: ${orderingToString(inputType, overWindow.orderKeys.getFieldCollations)}, " +
    +      s"${windowRange(overWindow)}, " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          getRowType,
    +          namedAggregates)
    +      }))"
    +  }
    +
    +  private def windowRange(overWindow: Group): String = {
    --- End diff --
    
    Move to `OverAggregate`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103631287
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.java.typeutils.runtime.RowSerializer
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val projectionsMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val  intermediateRowType: RowTypeInfo,
    +  @transient private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  protected var stateSerializer: TypeSerializer[Row] = _
    +  protected var stateDescriptor: ValueStateDescriptor[Row] = _
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(projectionsMapping)
    +    Preconditions.checkNotNull(aggregateMapping)
    +    Preconditions.checkArgument(aggregates.length == aggregateMapping.length)
    +
    +    val finalRowLength: Int = projectionsMapping.length + aggregateMapping.length
    +    output = new Row(finalRowLength)
    +    stateSerializer = intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    stateDescriptor = new ValueStateDescriptor[Row]("overState", stateSerializer)
    --- End diff --
    
    Oh, I see. Wasn't aware of this change. Then let's keep it as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102942616
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala ---
    @@ -27,7 +27,7 @@ import org.apache.flink.util.Preconditions
     class AggregateMapFunction[IN, OUT](
         private val aggregates: Array[Aggregate[_]],
         private val aggFields: Array[Int],
    -    private val groupingKeys: Array[Int],
    +    private val projectionsExceptAggregates: Array[Int],
    --- End diff --
    
    rename to `forwardedFields`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104424488
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +      .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +      .item("overWindow", overWindow)
    +      .item(
    +        "select", aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq()))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    if (overWindow.orderKeys.getFieldCollations.size() != 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be ordered on a time mode column.")
    +    }
    +
    +    val timeType = inputType
    +      .getFieldList
    +      .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
    +      .getValue
    +
    +    timeType match {
    +      case _: ProcTimeType =>
    +        // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
    +        if (overWindow.lowerBound.isUnbounded &&
    +          overWindow.upperBound.isCurrentRow) {
    +          createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
    +        } else {
    +          throw new TableException(
    +              "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
    +              "condition.")
    +        }
    +      case _: RowTimeType =>
    +        throw new TableException("OVER Window of the EventTime type is not currently supported.")
    +      case _ =>
    +        throw new TableException(s"Unsupported time type {$timeType}")
    +    }
    +
    +  }
    +
    +  def createUnboundedAndCurrentRowProcessingTimeOverWindow(
    +    inputDS: DataStream[Row]): DataStream[Row]  = {
    +
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    val inputIndices = (0 until inputType.getFieldCount).toArray
    +
    +    // get the output types
    +    val fieldTypes: Array[TypeInformation[_]] = getRowType
    --- End diff --
    
    `rowTypeInfo` can be created as:
    
    ```
    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104642876
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,199 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.OverAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with OverAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    s"OverAggregate(${aggOpName})"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partition: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +      .itemIf("partitionBy", partitionToString(inputType, partition), partition.nonEmpty)
    +        .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations))
    +      .item("range", windowRange(overWindow))
    +      .item(
    +        "select", aggregationToString(
    +          inputType,
    +          getRowType,
    +          namedAggregates))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +    if (logicWindow.groups.size > 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    if (overWindow.orderKeys.getFieldCollations.size() != 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. The window may only be ordered by a single time column.")
    +    }
    +
    +    val timeType = inputType
    +      .getFieldList
    +      .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
    +      .getValue
    +
    +    timeType match {
    +      case _: ProcTimeType =>
    +        // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
    +        if (overWindow.lowerBound.isUnbounded &&
    +          overWindow.upperBound.isCurrentRow) {
    +          createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
    +        } else {
    +          throw new TableException(
    +              "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
    +              "condition.")
    +        }
    +      case _: RowTimeType =>
    +        throw new TableException("OVER Window of the EventTime type is not currently supported.")
    +      case _ =>
    +        throw new TableException(s"Unsupported time type {$timeType}")
    +    }
    +
    +  }
    +
    +  def createUnboundedAndCurrentRowProcessingTimeOverWindow(
    +    inputDS: DataStream[Row]): DataStream[Row]  = {
    +
    +    val (_,
    +      partition: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +
    +    // get the output types
    +    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
    +
    +    val result: DataStream[Row] =
    +        // partitioned aggregation
    +        if (partition.nonEmpty) {
    +          val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
    +            namedAggregates,
    +            inputType)
    +
    +          inputDS
    +          .keyBy(partition: _*)
    +          .process(processFunction)
    +          .returns(rowTypeInfo)
    +          .name(aggOpName)
    +          .asInstanceOf[DataStream[Row]]
    +        }
    +        // global non-partitioned aggregation
    +        else {
    +          throw TableException("non-partitioned over Aggregation is currently not supported...")
    --- End diff --
    
    "Non-partitioned processing time OVER aggregation is not supported yet."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] processing t...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3397
  
    Hi, @fhueske I have rebase the code on #3423 's commit, and updated the PR. I appreciate if you can have look at this PR again.
    Thanks,
    SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103006641
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.volcano.RelSubset
    +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.calcite.rel.logical.LogicalWindow
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Rule to convert a LogicalWindow into a DataStreamOverAggregate.
    +  */
    +class DataStreamOverAggregateRule
    +  extends ConverterRule(
    +    classOf[LogicalWindow],
    +    Convention.NONE,
    +    DataStreamConvention.INSTANCE,
    +    "DataStreamOverAggregateRule") {
    +
    +  override def convert(rel: RelNode): RelNode = {
    --- End diff --
    
    When I have rebased the PR on the latest master and try to use `ProcTime()/RowTime()`, I got a exception:
    `org.apache.flink.table.codegen.CodeGenException: Unsupported call: PROCTIME 
    If you think this function should be supported, you can create an issue and start a discussion for it.atorg.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1116)`
    The reason for the exception is that "EventTimeExtractor / ProcTimeExtractor" is not processed in "FunctionGenerator # getCallGenerator". Do we open a new JIRA for processing or fix it in the current PR? What do you think? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104457495
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---
    @@ -32,6 +33,17 @@ import scala.collection.mutable
     
     class SqlITCase extends StreamingMultipleProgramsTestBase {
    --- End diff --
    
    In addition to an integration test we should also add tests based on `TableTestBase` which check the correct translation into `DataSetRelNode`s. See `WindowAggregateTest` as an example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102942024
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---
    @@ -171,4 +183,79 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
         val expected = mutable.MutableList("Hello", "Hello world")
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
    +
    +  @Test
    +  def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    //TODO ORDER BY will be fixed, once FLINK-5710 is solved
    +    tEnv.registerFunction("ProcTime", ProcTime)
    +    val sqlQuery = "SELECT c, count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE " +
    --- End diff --
    
    Please reformat the queries to make them easier to read, for example like this:
    ```
    "SELECT " +
    "  c, " +
    "  COUNT(a) OVER (PARTITION BY c ORDER BY proctime() RANGE UNBOUNDED PRECEDING) AS cnt1, " +
    "  COUNT(a) OVER (PARTITION BY c ORDER BY proctime() RANGE UNBOUNDED PRECEDING) AS cnt2 " +
    "FROM T1"
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102948678
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +
    +import java.util.{List => JList}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    namedProperties: Seq[NamedWindowProperty],
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      namedProperties,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          namedProperties)
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +    .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +    .item("overWindow", overWindow)
    +    .item(
    +      "select", aggregationToString(
    +        inputType,
    +        partitionKeys,
    +        getRowType,
    +        namedAggregates,
    +        namedProperties))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new UnsupportedOperationException(
    +        "Unsupported different over window in the same projection")
    --- End diff --
    
    Change to "Unsupported use of OVER windows. All aggregates must be computed on the same window."?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102984086
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---
    @@ -171,4 +183,79 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
         val expected = mutable.MutableList("Hello", "Hello world")
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
    +
    +  @Test
    +  def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    //TODO ORDER BY will be fixed, once FLINK-5710 is solved
    +    tEnv.registerFunction("ProcTime", ProcTime)
    +    val sqlQuery = "SELECT c, count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE " +
    --- End diff --
    
    Yes use `ORDER BY b` in query returns the same result. We only in the sql verification stage using `b` right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104422579
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +      .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +      .item("overWindow", overWindow)
    +      .item(
    +        "select", aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq()))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    if (overWindow.orderKeys.getFieldCollations.size() != 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be ordered on a time mode column.")
    --- End diff --
    
    "All aggregates must be ordered on a time mode column." -> "The window may only be ordered by a single time column." 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103512544
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.java.typeutils.runtime.RowSerializer
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val projectionsMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val  intermediateRowType: RowTypeInfo,
    +  @transient private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  protected var stateSerializer: TypeSerializer[Row] = _
    +  protected var stateDescriptor: ValueStateDescriptor[Row] = _
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(projectionsMapping)
    +    Preconditions.checkNotNull(aggregateMapping)
    +    Preconditions.checkArgument(aggregates.length == aggregateMapping.length)
    +
    +    val finalRowLength: Int = projectionsMapping.length + aggregateMapping.length
    +    output = new Row(finalRowLength)
    +    stateSerializer = intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    stateDescriptor = new ValueStateDescriptor[Row]("overState", stateSerializer)
    +    state = getRuntimeContext.getState(stateDescriptor)
    +  }
    +
    +  override def processElement(
    +    value2: Row,
    +    ctx: Context,
    +    out: Collector[Row]): Unit = {
    +    val value1 = state.value()
    +    val accumulatorRow = new Row(intermediateRowType.getArity)
    +
    +    if (null != value1) {
    +      // copy all fields of value1 into accumulatorRow
    +      (0 until intermediateRowType.getArity)
    +      .foreach(i => accumulatorRow.setField(i, value1.getField(i)))
    +      // merge value2 to accumulatorRow
    +      aggregates.foreach(_.merge(value2, accumulatorRow))
    +      // Set projections value to final output.
    +      projectionsMapping.foreach {
    --- End diff --
    
    Why do we set these fields into `accumulatorRow`? They do not need to be included in the state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] processing t...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3397
  
    @fhueske Thank you very much for your extra work! the last updated of this PR.
    Best wishes,
    SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104586138
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    --- End diff --
    
    Does from `SELECT c, count(a) OVER (PARTITION BY c ORDER BY ProcTime()  RANGE UNBOUNDED preceding) as cnt1, sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 from T1` to
    `OverAggregate(over: (PARTITION BY: c, ORDER BY: PROCTIME, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, select: (a, c, PROCTIME, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1)))` make sense ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102951067
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +
    +import java.util.{List => JList}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    namedProperties: Seq[NamedWindowProperty],
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      namedProperties,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          namedProperties)
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +    .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +    .item("overWindow", overWindow)
    +    .item(
    +      "select", aggregationToString(
    +        inputType,
    +        partitionKeys,
    +        getRowType,
    +        namedAggregates,
    +        namedProperties))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new UnsupportedOperationException(
    +        "Unsupported different over window in the same projection")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    // TODO timeDomain will be fixed, once FLINK-5884 is solved
    +    // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
    +    if (/*logicWindow.timeDomain.equals(
    +        TimeDomain.PROCESSING_TIME && */
    +      overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
    +       createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
    +    } else {
    +      throw new UnsupportedOperationException(
    +        "Over window only support UNBOUNDED PRECEDING and CURRENT ROW condition.")
    +    }
    +
    +  }
    +
    +  def createUnboundedAndCurrentRowProcessingTimeOverWindow(
    +    inputDS: DataStream[Row]): DataStream[Row]  = {
    +
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    val allInputProjections = (for (i <- 0 until inputType.getFieldCount) yield i).toArray
    --- End diff --
    
    `(0 until inputType.getFieldCount).toArray`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] processing t...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3397
  
    Hi @sunjincheng121, I opened PR #3425, which adds custom data types for `proctime()` and `rowtime()` and replaces the functions during translation with constants.
    
    You can rebase your commit on this PR and continue working on it.
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103502715
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTime, RowTimeType, TimeModeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +      .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +      .item("overWindow", overWindow)
    +      .item(
    +        "select", aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq()))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    if (overWindow.orderKeys.getFieldCollations.size() !=1){
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be ordered on a time mode column.")
    +    }
    +
    +    val timeMode = inputType
    +      .getFieldList
    +      .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
    +      .getValue
    +
    +    timeMode match {
    +      case _: ProcTimeType =>
    +        // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
    +        if (overWindow.lowerBound.isUnbounded &&
    +          overWindow.upperBound.isCurrentRow) {
    +          createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
    +        } else {
    +          throw new TableException(
    +            "Unsupported use of OVER windows." +
    +              "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
    +              "condition.")
    +        }
    +      case _: RowTimeType =>
    +        throw new TableException("Row-Time OVER window not supported.")
    --- End diff --
    
    improve error message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104456748
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    --- End diff --
    
    This snippet might help:
    ```
    def getWindowRange: String = {
    
          if (overWindow.lowerBound != null) {
            if (overWindow.upperBound != null) {
              s"BETWEEN ${overWindow.lowerBound} AND ${overWindow.upperBound}"
            } else {
              s"${overWindow.lowerBound} PRECEDING AND CURRENT ROW"
            }
          } else if (overWindow.upperBound != null) {
            s"${overWindow.upperBound} FOLLOWING"
          }
          "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104610828
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    --- End diff --
    
    That looks very good, IMO!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103508272
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.java.typeutils.runtime.RowSerializer
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val projectionsMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val  intermediateRowType: RowTypeInfo,
    +  @transient private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  protected var stateSerializer: TypeSerializer[Row] = _
    +  protected var stateDescriptor: ValueStateDescriptor[Row] = _
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    --- End diff --
    
    checks can be moved into the constructor


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103501952
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTime, RowTimeType, TimeModeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +      .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +      .item("overWindow", overWindow)
    +      .item(
    +        "select", aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq()))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    if (overWindow.orderKeys.getFieldCollations.size() !=1){
    --- End diff --
    
    whitespace after `!=`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102946820
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.volcano.RelSubset
    +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.calcite.rel.logical.LogicalWindow
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Rule to convert a LogicalWindow into a DataStreamOverAggregate.
    +  */
    +class DataStreamOverAggregateRule
    +  extends ConverterRule(
    +    classOf[LogicalWindow],
    +    Convention.NONE,
    +    DataStreamConvention.INSTANCE,
    +    "DataStreamOverAggregateRule") {
    +
    +  override def convert(rel: RelNode): RelNode = {
    --- End diff --
    
    We need to check that ORDER BY is on `proctime()`.
    Currently any attribute can be used in the ORDER BY clause and the query is executed without issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104617448
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          Seq())
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] processing t...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3397
  
    @fhueske thanks for the PR(#3425). I have rebased the code, and add timeMode check. I'm appreciated if you can look at this PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103512951
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---
    @@ -171,4 +183,82 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
         val expected = mutable.MutableList("Hello", "Hello world")
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
    +
    +  @Test
    +  def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    // for sum aggregation ensure that every time the order of each element is consistent
    +    env.setParallelism(1)
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT ProcTime()," +
    +      "c, " +
    +      "count(a) OVER (PARTITION BY c ORDER BY ProcTime()  RANGE UNBOUNDED preceding) as cnt1, " +
    +      "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
    +      "from T1"
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
    +      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testUnboundPartitionedProcessingWindowWithRow(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT " +
    --- End diff --
    
    This query should not work because `ORDER BY` is missing which is required for RANGE and ROWS because the result is not completely defined otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102948984
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +
    +import java.util.{List => JList}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    namedProperties: Seq[NamedWindowProperty],
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      namedProperties,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          namedProperties)
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +    .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +    .item("overWindow", overWindow)
    +    .item(
    +      "select", aggregationToString(
    +        inputType,
    +        partitionKeys,
    +        getRowType,
    +        namedAggregates,
    +        namedProperties))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new UnsupportedOperationException(
    +        "Unsupported different over window in the same projection")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    // TODO timeDomain will be fixed, once FLINK-5884 is solved
    +    // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
    +    if (/*logicWindow.timeDomain.equals(
    +        TimeDomain.PROCESSING_TIME && */
    +      overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
    +       createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
    +    } else {
    +      throw new UnsupportedOperationException(
    --- End diff --
    
    `TableException`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104642668
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,199 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.OverAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +import java.util.{List => JList}
    +
    +import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with OverAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    s"OverAggregate(${aggOpName})"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partition: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +      .itemIf("partitionBy", partitionToString(inputType, partition), partition.nonEmpty)
    +        .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations))
    +      .item("range", windowRange(overWindow))
    +      .item(
    +        "select", aggregationToString(
    +          inputType,
    +          getRowType,
    +          namedAggregates))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +    if (logicWindow.groups.size > 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. All aggregates must be computed on the same window.")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    if (overWindow.orderKeys.getFieldCollations.size() != 1) {
    +      throw new TableException(
    +        "Unsupported use of OVER windows. The window may only be ordered by a single time column.")
    +    }
    +
    +    val timeType = inputType
    +      .getFieldList
    +      .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
    +      .getValue
    +
    +    timeType match {
    +      case _: ProcTimeType =>
    +        // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
    +        if (overWindow.lowerBound.isUnbounded &&
    +          overWindow.upperBound.isCurrentRow) {
    +          createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
    +        } else {
    +          throw new TableException(
    +              "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
    +              "condition.")
    +        }
    +      case _: RowTimeType =>
    +        throw new TableException("OVER Window of the EventTime type is not currently supported.")
    +      case _ =>
    +        throw new TableException(s"Unsupported time type {$timeType}")
    +    }
    +
    +  }
    +
    +  def createUnboundedAndCurrentRowProcessingTimeOverWindow(
    +    inputDS: DataStream[Row]): DataStream[Row]  = {
    +
    +    val (_,
    +      partition: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    --- End diff --
    
    change `genPartitionKeysAndNamedAggregates()` to only generate named aggregates and rename.
    Functions that do only one thing are easier to understand and partition keys can be generated in one line: `overWindow.keys.toArray`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] processing t...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3397
  
    HI, @fhueske,
    Thank you for your attention to this JIRA. thanks!
    
    I agree that we need a few more iterations on this JIRA. make progress in each iteration and gradually perfect.I will remove the merge commit (the second one) and rebase the PR on the latest master after your merge PR.(#3370).(and I see you had done it)
    
    I think it's great if we Use special data type to identify the time mode. Some thing like `TimeCharacteristic.ProcessingTime` `TimeCharacteristic. EventTime`. I like this way.
    
    Best,
    SunJincheng



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103509126
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.java.typeutils.runtime.RowSerializer
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val projectionsMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val  intermediateRowType: RowTypeInfo,
    +  @transient private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  protected var stateSerializer: TypeSerializer[Row] = _
    +  protected var stateDescriptor: ValueStateDescriptor[Row] = _
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(projectionsMapping)
    +    Preconditions.checkNotNull(aggregateMapping)
    +    Preconditions.checkArgument(aggregates.length == aggregateMapping.length)
    +
    +    val finalRowLength: Int = projectionsMapping.length + aggregateMapping.length
    +    output = new Row(finalRowLength)
    +    stateSerializer = intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    stateDescriptor = new ValueStateDescriptor[Row]("overState", stateSerializer)
    --- End diff --
    
    We can configure the state with a default value which is returned instead of `null` on the first access.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104628040
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -63,50 +62,28 @@ class DataStreamOverAggregate(
       }
     
       override def toString: String = {
    -    val (
    -      overWindow: Group,
    -      partitionKeys: Array[Int],
    -      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    -      ) = genPartitionKeysAndNamedAggregates
    -
    -    s"Aggregate(${
    -      if (!partitionKeys.isEmpty) {
    -        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    -      } else {
    -        ""
    -      }
    -    }window: ($overWindow), " +
    -      s"select: (${
    -        aggregationToString(
    -          inputType,
    -          partitionKeys,
    -          getRowType,
    -          namedAggregates,
    -          Seq())
    -      }))"
    +    s"OverAggregate(${aggOpName})"
       }
     
       override def explainTerms(pw: RelWriter): RelWriter = {
         val (
           overWindow: Group,
    -      partitionKeys: Array[Int],
    +      partition: Array[Int],
           namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
           ) = genPartitionKeysAndNamedAggregates
     
         super.explainTerms(pw)
    -      .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    -      .item("overWindow", overWindow)
    +      .itemIf("partitionBy", partitionToString(inputType, partition), partition.nonEmpty)
    +        .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations))
    +      .item("range", windowRange(overWindow))
    --- End diff --
    
    ```
    .itemIf("rows", windowRange(overWindow), overWindow.isRows)
    .itemIf("range", windowRange(overWindow), !overWindow.isRows)
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104426850
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFields: Array[Int],
    +    private val forwardedFieldCount: Int,
    +    private val intermediateRowType: RowTypeInfo,
    +    private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex
    +
    +  override def open(config: Configuration) {
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    val stateSerializer: TypeSerializer[Row] =
    +      intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    val stateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("overState", stateSerializer)
    --- End diff --
    
    We can define the `ValueStateDescriptor` without a `TypeSerializer` but a `TypeInformation` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104431561
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -93,6 +94,41 @@ object AggregateUtil {
       }
     
       /**
    +    * Create an [[RichProcessFunction]] to evaluate final aggregate value.
    +    *
    +    * @param namedAggregates List of calls to aggregate functions and their output field names
    +    * @param inputType Input row type
    +    * @param outputType Output row type
    +    * @param forwardedFields All the forwarded fields
    +    * @return [[UnboundedProcessingOverProcessFunction]]
    +    */
    +  private[flink] def CreateUnboundedProcessingOverProcessFunction(
    +    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +    inputType: RelDataType,
    +    outputType: RelDataType,
    +    forwardedFields: Array[Int]): UnboundedProcessingOverProcessFunction = {
    +
    +    val (aggFields, aggregates) =
    +      transformToAggregateFunctions(
    +        namedAggregates.map(_.getKey),
    +        inputType,
    +        forwardedFields.length)
    --- End diff --
    
    the `groupKeysCount` (here forwardedFields.length) parameter can be removed from `AggregateUtil.transformToAggregateFunctions()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102946239
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.volcano.RelSubset
    +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.calcite.rel.logical.LogicalWindow
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Rule to convert a LogicalWindow into a DataStreamOverAggregate.
    +  */
    +class DataStreamOverAggregateRule
    +  extends ConverterRule(
    +    classOf[LogicalWindow],
    +    Convention.NONE,
    +    DataStreamConvention.INSTANCE,
    +    "DataStreamOverAggregateRule") {
    +
    +  override def convert(rel: RelNode): RelNode = {
    +    val logicWindow: LogicalWindow = rel.asInstanceOf[LogicalWindow]
    +    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
    +    val convertInput: RelNode =
    +      RelOptRule.convert(logicWindow.getInput, DataStreamConvention.INSTANCE)
    +
    +    val inputRowType = convertInput.asInstanceOf[RelSubset].getOriginal.getRowType
    +
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      Seq[NamedWindowProperty](),
    --- End diff --
    
    Do we need window properties for `OVER` windows?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103509239
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.java.typeutils.runtime.RowSerializer
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val projectionsMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val  intermediateRowType: RowTypeInfo,
    +  @transient private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  protected var stateSerializer: TypeSerializer[Row] = _
    +  protected var stateDescriptor: ValueStateDescriptor[Row] = _
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(projectionsMapping)
    +    Preconditions.checkNotNull(aggregateMapping)
    +    Preconditions.checkArgument(aggregates.length == aggregateMapping.length)
    +
    +    val finalRowLength: Int = projectionsMapping.length + aggregateMapping.length
    +    output = new Row(finalRowLength)
    +    stateSerializer = intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    stateDescriptor = new ValueStateDescriptor[Row]("overState", stateSerializer)
    +    state = getRuntimeContext.getState(stateDescriptor)
    +  }
    +
    +  override def processElement(
    +    value2: Row,
    +    ctx: Context,
    +    out: Collector[Row]): Unit = {
    +    val value1 = state.value()
    +    val accumulatorRow = new Row(intermediateRowType.getArity)
    +
    +    if (null != value1) {
    --- End diff --
    
    If we set a default value for the state, we do not need this distinction. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102985650
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---
    @@ -171,4 +183,79 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
         val expected = mutable.MutableList("Hello", "Hello world")
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
    +
    +  @Test
    +  def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    //TODO ORDER BY will be fixed, once FLINK-5710 is solved
    +    tEnv.registerFunction("ProcTime", ProcTime)
    +    val sqlQuery = "SELECT c, count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE " +
    --- End diff --
    
    Since it is a valid SQL query it is accepted. We should make sure though that only `ORDER BY proctime()` is allowed. However, that's a bit difficult right now, because the information is split across Calc and Window. With a ProcTimeType we can check the field type of the input of Window.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103154817
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.volcano.RelSubset
    +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.calcite.rel.logical.LogicalWindow
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Rule to convert a LogicalWindow into a DataStreamOverAggregate.
    +  */
    +class DataStreamOverAggregateRule
    +  extends ConverterRule(
    +    classOf[LogicalWindow],
    +    Convention.NONE,
    +    DataStreamConvention.INSTANCE,
    +    "DataStreamOverAggregateRule") {
    +
    +  override def convert(rel: RelNode): RelNode = {
    --- End diff --
    
    I'm currently preparing a PR to make the handling of `proctime()` easier. Initially, the function will just set a constant value and add a little bit of overhead. But later we will remove it before code generation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102948733
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +
    +import java.util.{List => JList}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    namedProperties: Seq[NamedWindowProperty],
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      namedProperties,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          namedProperties)
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +    .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +    .item("overWindow", overWindow)
    +    .item(
    +      "select", aggregationToString(
    +        inputType,
    +        partitionKeys,
    +        getRowType,
    +        namedAggregates,
    +        namedProperties))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new UnsupportedOperationException(
    --- End diff --
    
    `TableException`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104430248
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -93,6 +94,41 @@ object AggregateUtil {
       }
     
       /**
    +    * Create an [[RichProcessFunction]] to evaluate final aggregate value.
    +    *
    +    * @param namedAggregates List of calls to aggregate functions and their output field names
    +    * @param inputType Input row type
    +    * @param outputType Output row type
    +    * @param forwardedFields All the forwarded fields
    +    * @return [[UnboundedProcessingOverProcessFunction]]
    +    */
    +  private[flink] def CreateUnboundedProcessingOverProcessFunction(
    +    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +    inputType: RelDataType,
    +    outputType: RelDataType,
    +    forwardedFields: Array[Int]): UnboundedProcessingOverProcessFunction = {
    +
    +    val (aggFields, aggregates) =
    +      transformToAggregateFunctions(
    +        namedAggregates.map(_.getKey),
    +        inputType,
    +        forwardedFields.length)
    +
    +    val rowTypeInfo = new RowTypeInfo(outputType.getFieldList
    +      .map(field => FlinkTypeFactory.toTypeInfo(field.getType)): _*)
    +
    +    val intermediateRowType: RowTypeInfo =
    +      createDataSetAggregateBufferDataType(forwardedFields, aggregates, inputType)
    --- End diff --
    
    The `intermediateRowType` (or `aggregationStateType`) should not depend on the forwarded fields but just on the aggregates, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104427571
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFields: Array[Int],
    +    private val forwardedFieldCount: Int,
    +    private val intermediateRowType: RowTypeInfo,
    +    private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex
    +
    +  override def open(config: Configuration) {
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    val stateSerializer: TypeSerializer[Row] =
    +      intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    val stateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("overState", stateSerializer)
    +    state = getRuntimeContext.getState(stateDescriptor)
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: Context,
    +    out: Collector[Row]): Unit = {
    +
    +    var accumulators = state.value()
    +
    +    if (null == accumulators) {
    +      accumulators = new Row(aggregates.length)
    +      aggregateWithIndex.foreach { case (agg, i) =>
    +        accumulators.setField(i, agg.createAccumulator())
    +      }
    +    }
    +
    +    for (i <- 0 until forwardedFieldCount) {
    +      output.setField(i, input.getField(i))
    +    }
    +
    +    for (i <- 0 until aggregates.length) {
    +      val index = forwardedFieldCount + i
    +      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator];
    --- End diff --
    
    remove `;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102968906
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.java.typeutils.runtime.RowSerializer
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val projectionsMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val  intermediateRowType: RowTypeInfo,
    +  @transient private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  protected var stateSerializer: TypeSerializer[Row] = _
    +  protected var stateDescriptor: ValueStateDescriptor[Row] = _
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(projectionsMapping)
    +    Preconditions.checkNotNull(aggregateMapping)
    +    Preconditions.checkArgument(aggregates.length == aggregateMapping.length)
    +
    +    val finalRowLength: Int = projectionsMapping.length + aggregateMapping.length
    +    output = new Row(finalRowLength)
    +    stateSerializer = intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    stateDescriptor = new ValueStateDescriptor[Row]("overState", stateSerializer)
    +
    +  }
    +
    +  override def processElement(
    +    value2: Row,
    +    ctx: Context,
    +    out: Collector[Row]): Unit = {
    +    state = getRuntimeContext.getState(stateDescriptor)
    --- End diff --
    
    We can do the `getState()` in the `open()` method and store the state in a `transitive` member variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102951599
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +
    +import java.util.{List => JList}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    namedProperties: Seq[NamedWindowProperty],
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      namedProperties,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          namedProperties)
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +    .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +    .item("overWindow", overWindow)
    +    .item(
    +      "select", aggregationToString(
    +        inputType,
    +        partitionKeys,
    +        getRowType,
    +        namedAggregates,
    +        namedProperties))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new UnsupportedOperationException(
    +        "Unsupported different over window in the same projection")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    // TODO timeDomain will be fixed, once FLINK-5884 is solved
    +    // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
    +    if (/*logicWindow.timeDomain.equals(
    +        TimeDomain.PROCESSING_TIME && */
    +      overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
    +       createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
    +    } else {
    +      throw new UnsupportedOperationException(
    +        "Over window only support UNBOUNDED PRECEDING and CURRENT ROW condition.")
    +    }
    +
    +  }
    +
    +  def createUnboundedAndCurrentRowProcessingTimeOverWindow(
    +    inputDS: DataStream[Row]): DataStream[Row]  = {
    +
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    val allInputProjections = (for (i <- 0 until inputType.getFieldCount) yield i).toArray
    --- End diff --
    
    rename to `inputIndicies`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104428328
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFields: Array[Int],
    +    private val forwardedFieldCount: Int,
    +    private val intermediateRowType: RowTypeInfo,
    +    private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex
    +
    +  override def open(config: Configuration) {
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    val stateSerializer: TypeSerializer[Row] =
    +      intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    val stateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("overState", stateSerializer)
    +    state = getRuntimeContext.getState(stateDescriptor)
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: Context,
    +    out: Collector[Row]): Unit = {
    +
    +    var accumulators = state.value()
    +
    +    if (null == accumulators) {
    +      accumulators = new Row(aggregates.length)
    +      aggregateWithIndex.foreach { case (agg, i) =>
    +        accumulators.setField(i, agg.createAccumulator())
    +      }
    +    }
    +
    +    for (i <- 0 until forwardedFieldCount) {
    +      output.setField(i, input.getField(i))
    +    }
    +
    +    for (i <- 0 until aggregates.length) {
    +      val index = forwardedFieldCount + i
    +      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator];
    +      aggregates(i).accumulate(accumulator, input.getField(aggFields(i)))
    +      output.setField(index, aggregates(i).getValue(accumulator))
    +      accumulators.setField(i, accumulator)
    --- End diff --
    
    I think this is not necessary. `accumulators` still has a reference on `accumulator`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102949757
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.nodes.datastream
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
    +import org.apache.flink.table.runtime.aggregate._
    +import org.apache.flink.table.plan.nodes.CommonAggregate
    +import org.apache.flink.types.Row
    +import org.apache.calcite.rel.core.Window
    +import org.apache.calcite.rel.core.Window.Group
    +
    +import java.util.{List => JList}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.immutable.IndexedSeq
    +
    +class DataStreamOverAggregate(
    +    logicWindow: Window,
    +    namedProperties: Seq[NamedWindowProperty],
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    inputNode: RelNode,
    +    rowRelDataType: RelDataType,
    +    inputType: RelDataType)
    +  extends SingleRel(cluster, traitSet, inputNode)
    +  with CommonAggregate
    +  with DataStreamRel {
    +
    +  override def deriveRowType(): RelDataType = rowRelDataType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
    +    new DataStreamOverAggregate(
    +      logicWindow,
    +      namedProperties,
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      getRowType,
    +      inputType)
    +  }
    +
    +  override def toString: String = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    s"Aggregate(${
    +      if (!partitionKeys.isEmpty) {
    +        s"partitionBy: (${groupingToString(inputType, partitionKeys)}), "
    +      } else {
    +        ""
    +      }
    +    }window: ($overWindow), " +
    +      s"select: (${
    +        aggregationToString(
    +          inputType,
    +          partitionKeys,
    +          getRowType,
    +          namedAggregates,
    +          namedProperties)
    +      }))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    val (
    +      overWindow: Group,
    +      partitionKeys: Array[Int],
    +      namedAggregates: IndexedSeq[CalcitePair[AggregateCall, String]]
    +      ) = genPartitionKeysAndNamedAggregates
    +
    +    super.explainTerms(pw)
    +    .itemIf("partitionBy", groupingToString(inputType, partitionKeys), !partitionKeys.isEmpty)
    +    .item("overWindow", overWindow)
    +    .item(
    +      "select", aggregationToString(
    +        inputType,
    +        partitionKeys,
    +        getRowType,
    +        namedAggregates,
    +        namedProperties))
    +  }
    +
    +  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
    +
    +    if (logicWindow.groups.size > 1) {
    +      throw new UnsupportedOperationException(
    +        "Unsupported different over window in the same projection")
    +    }
    +
    +    val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
    +
    +    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
    +
    +    // TODO timeDomain will be fixed, once FLINK-5884 is solved
    --- End diff --
    
    We can also think about encoding the time semantics as special data types.
    proctime() would produce this special data type against we can check on the plan level.
    When translating the plan we ignore proctime and resulting attribute.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] processing t...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3397
  
    @fhueske  I had updated the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102946986
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---
    @@ -171,4 +183,79 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
         val expected = mutable.MutableList("Hello", "Hello world")
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
    +
    +  @Test
    +  def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    //TODO ORDER BY will be fixed, once FLINK-5710 is solved
    +    tEnv.registerFunction("ProcTime", ProcTime)
    +    val sqlQuery = "SELECT c, count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE " +
    --- End diff --
    
    Use two different aggregation functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r103123784
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import org.apache.calcite.plan.volcano.RelSubset
    +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.calcite.rel.logical.LogicalWindow
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention
    +import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Rule to convert a LogicalWindow into a DataStreamOverAggregate.
    +  */
    +class DataStreamOverAggregateRule
    +  extends ConverterRule(
    +    classOf[LogicalWindow],
    +    Convention.NONE,
    +    DataStreamConvention.INSTANCE,
    +    "DataStreamOverAggregateRule") {
    +
    +  override def convert(rel: RelNode): RelNode = {
    --- End diff --
    
    If i understand it correctly, may be we need not fix the exception,because like you had said: `proctime() is just a marker and cannot be code-generated but is expected to be removed. `. ` I'll try to change that and add a special data type that can be used to identify the time mode.` 
    If so, does that work have already opened JIRA. or have a design document?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104466819
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -93,6 +94,41 @@ object AggregateUtil {
       }
     
       /**
    +    * Create an [[RichProcessFunction]] to evaluate final aggregate value.
    +    *
    +    * @param namedAggregates List of calls to aggregate functions and their output field names
    +    * @param inputType Input row type
    +    * @param outputType Output row type
    +    * @param forwardedFields All the forwarded fields
    +    * @return [[UnboundedProcessingOverProcessFunction]]
    +    */
    +  private[flink] def CreateUnboundedProcessingOverProcessFunction(
    +    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +    inputType: RelDataType,
    +    outputType: RelDataType,
    +    forwardedFields: Array[Int]): UnboundedProcessingOverProcessFunction = {
    +
    +    val (aggFields, aggregates) =
    +      transformToAggregateFunctions(
    +        namedAggregates.map(_.getKey),
    +        inputType,
    +        forwardedFields.length)
    +
    +    val rowTypeInfo = new RowTypeInfo(outputType.getFieldList
    +      .map(field => FlinkTypeFactory.toTypeInfo(field.getType)): _*)
    +
    +    val intermediateRowType: RowTypeInfo =
    +      createDataSetAggregateBufferDataType(forwardedFields, aggregates, inputType)
    --- End diff --
    
    We would need to add this dependency to the flink-table pom.xml
    ```
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
      <version>${project.version}</version>
      <scope>test</scope>
    </dependency>
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104427473
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFields: Array[Int],
    +    private val forwardedFieldCount: Int,
    +    private val intermediateRowType: RowTypeInfo,
    +    private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex
    +
    +  override def open(config: Configuration) {
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    val stateSerializer: TypeSerializer[Row] =
    +      intermediateRowType.createSerializer(getRuntimeContext.getExecutionConfig)
    +    val stateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("overState", stateSerializer)
    +    state = getRuntimeContext.getState(stateDescriptor)
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: Context,
    +    out: Collector[Row]): Unit = {
    +
    +    var accumulators = state.value()
    +
    +    if (null == accumulators) {
    +      accumulators = new Row(aggregates.length)
    +      aggregateWithIndex.foreach { case (agg, i) =>
    +        accumulators.setField(i, agg.createAccumulator())
    +      }
    +    }
    +
    +    for (i <- 0 until forwardedFieldCount) {
    +      output.setField(i, input.getField(i))
    +    }
    +
    +    for (i <- 0 until aggregates.length) {
    --- End diff --
    
    `for (i <- aggregates.indices) {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r104426925
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
    +import org.apache.flink.types.Row
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +
    +class UnboundedProcessingOverProcessFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFields: Array[Int],
    +    private val forwardedFieldCount: Int,
    +    private val intermediateRowType: RowTypeInfo,
    +    private val returnType: TypeInformation[Row])
    +  extends RichProcessFunction[Row, Row]{
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +
    +  private var output: Row = _
    +  private var state: ValueState[Row] = _
    +  private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex
    +
    +  override def open(config: Configuration) {
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    val stateSerializer: TypeSerializer[Row] =
    --- End diff --
    
    serializer not required


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3397: [FLINK-5803][TableAPI&SQL] Add [partitioned] proce...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3397#discussion_r102942696
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -54,41 +55,80 @@ object AggregateUtil {
         * organized by the following format:
         *
         * {{{
    -    *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
    -    *                             |                          |
    -    *                             v                          v
    -    *        +---------+---------+--------+--------+--------+--------+
    -    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    -    *        +---------+---------+--------+--------+--------+--------+
    -    *                                              ^
    -    *                                              |
    -    *                               sum(y) aggOffsetInRow = 4
    +    *                      avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
    +    *                                   |                          |
    +    *                                   v                          v
    +    *        +----------+------------+--------+--------+--------+--------+
    +    *        |projection1|projection2|  sum1  | count1 |  sum2  | count2 |
    +    *        +----------+------------+--------+--------+--------+--------+
    +    *                                                      ^
    +    *                                                      |
    +    *                                           sum(y) aggOffsetInRow = 4
         * }}}
         *
         */
       private[flink] def createPrepareMapFunction(
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      groupings: Array[Int],
    -      inputType: RelDataType)
    -    : MapFunction[Row, Row] = {
    +    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +    projectionsExceptAggregates: Array[Int],
    --- End diff --
    
    rename to `forwardedFields`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---