You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by rtudoran <gi...@git.apache.org> on 2017/07/20 15:44:13 UTC

[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

GitHub user rtudoran opened a pull request:

    https://github.com/apache/flink/pull/4380

    Time sort with offset/fetch without retraction

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/huawei-flink/flink FLINK-6075-OF-NoRetraction

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4380.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4380
    
----
commit c95bf6b55cdf3e1b48d74de584937ec6b2c36bbe
Author: rtudoran <tu...@ymail.com>
Date:   2017-07-20T15:36:45Z

    Time sort with offset/fetch without retraction

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/4380
  
    @fhueske
    I have updated the PR.
    Based on the previous discussions i have:
    1) integrated the support for offset and fetch when time is ascending as you suggested (having a counter within the process function that will restrict the output)
    2) add back the implementation for offset and fetch with retraction and bind this for queries that would have time descending. It is important to remark that the way i intended to enable this behavior is that at every new time unit the offset/fetch restriction will be reapplied from scratch. That means that in an example such as (getting the first 2 elements): 
    t1: event 1, event 2, event 3 => output is event 1 and event 2
    t2: event 4 => output is retract (events 1&2) emit event 4
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/4380
  
    @fhueske ,@wuchong
    I updated the PR. Please have a look


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4380
  
    OK @rtudoran, that sounds good to me.
    
    I haven't had a detailed look at the retraction code you submitted before. If it has the right semantics for `ORDER BY *time DESC FETCH x`, please open a new PR and I'll be happy to review it.
    
    Regarding `ORDER BY *time ASC OFFSET / FETCH`, I'd modify this PR according to the comments.
    
    I didn't understand the issue with `LIMIT` yet. Why is `LIMIT 10` not equivalent to `FETCH FIRST 10 ROWS ONLY`? Both should return 10 rows, right? Aside from this question, I think if Calcite parses and translates `LIMIT` as `FETCH` the right way to change the behavior would be to fix it in Calcite.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129833309
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.Comparator
    +import java.util.ArrayList
    +import java.util.Collections
    +import org.apache.flink.api.common.typeutils.TypeComparator
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +
    +/**
    + * Process Function used for the sort based solely on proctime with offset/fetch
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param offset Is used to indicate the number of elements to be skipped in the current context
    + * (0 offset allows to execute only fetch)
    + * @param fetch Is used to indicate the number of elements to be outputted in the current context
    + * @param inputType It is used to mark the type of the incoming data
    + */
    +class ProcTimeIdentitySortProcessFunctionOffsetFetch(
    +  private val offset: Int,
    +  private val fetch: Int,
    +  private val inputRowType: CRowTypeInfo)
    +    extends ProcessFunction[CRow, CRow] {
    +
    +  private var stateEventsBuffer: ListState[Row] = _
    +  
    +  private var outputC: CRow = _
    +  private val adjustedFetchLimit = offset + fetch
    +  
    +  override def open(config: Configuration) {
    +    val sortDescriptor = new ListStateDescriptor[Row]("sortState",
    +        inputRowType.asInstanceOf[CRowTypeInfo].rowType)
    +    stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor)
    +
    +    val arity:Integer = inputRowType.getArity
    +    if (outputC == null) {
    +      outputC = new CRow(Row.of(arity), true)
    +    }
    +    
    +  }
    +
    +  override def processElement(
    +    inputC: CRow,
    +    ctx: ProcessFunction[CRow, CRow]#Context,
    +    out: Collector[CRow]): Unit = {
    +
    +    val input = inputC.row
    +    
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    //buffer the event incoming event
    +    stateEventsBuffer.add(input)
    +    
    +    //deduplication of multiple registered timers is done automatically
    +    ctx.timerService.registerProcessingTimeTimer(currentTime + 1)  
    +    
    +  }
    +  
    +  override def onTimer(
    --- End diff --
    
    we don't need timers for this operator


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129837121
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.Comparator
    +import java.util.ArrayList
    +import java.util.Collections
    +import org.apache.flink.api.common.typeutils.TypeComparator
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +
    +/**
    + * Process Function used for the sort based solely on proctime with offset/fetch
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param offset Is used to indicate the number of elements to be skipped in the current context
    + * (0 offset allows to execute only fetch)
    + * @param fetch Is used to indicate the number of elements to be outputted in the current context
    + * @param inputType It is used to mark the type of the incoming data
    + */
    +class ProcTimeIdentitySortProcessFunctionOffsetFetch(
    +  private val offset: Int,
    +  private val fetch: Int,
    +  private val inputRowType: CRowTypeInfo)
    +    extends ProcessFunction[CRow, CRow] {
    +
    +  private var stateEventsBuffer: ListState[Row] = _
    +  
    +  private var outputC: CRow = _
    +  private val adjustedFetchLimit = offset + fetch
    +  
    +  override def open(config: Configuration) {
    +    val sortDescriptor = new ListStateDescriptor[Row]("sortState",
    +        inputRowType.asInstanceOf[CRowTypeInfo].rowType)
    +    stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor)
    +
    +    val arity:Integer = inputRowType.getArity
    +    if (outputC == null) {
    +      outputC = new CRow(Row.of(arity), true)
    +    }
    +    
    +  }
    +
    +  override def processElement(
    --- End diff --
    
    This operator should be implemented as follows:
    
    ```
    val fetchCnt = fetchCount.value
    if (fetch == -1 || fetchCnt < fetch) {
      // we haven't fetched enough rows
      val offsetCnt = offsetCount.value
      if (offsetCnt < offset) {
        // we haven't skipped enough rows
        // increment counter and skip row
        offsetCount.update(offsetCnt + 1)
      } else {
       // forward row
       out.collect(inputC)
       if (fetchCnt != -1) {
         fetchCount.update(fetchCnt + 1)
       }
      }
    } else {
      // we fetch enough rows. drop Row and return
    }
    ```
    
    As you'll notice `ORDER BY proctime ASC FETCH x ROWS FIRST` is quite pointless because it will only emit x rows than nothing more. However, that's the correct semantics here. `OFFSET` is similar because it won't emit the first x rows which is not really meaningful either in a streaming context.
    
    The other combinations are basically the same. The only difference is that they do a bit more sorting to identify the rows that have to be dropped. The sorting operators have to do the sorting as before in `onTimer()` but each record has to pass the offset, fetch check before being emitted. Once, the fetch count is exceeded, we also don't need to put rows into state and can simply drop them.
    The reason why we need counters for offset and fetch are that this information must not get lost in case of a failure. Otherwise a job might emit rows after a failure even though it emitted enough rows before the failure.
    
    Please adjust all operators and tests to the correct semantics. Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/4380
  
    @fhueske
    Also - i do not understand why you would need to keep the counters for offset/fetch as states?
    
    Assuming we have in the buffer state with events for proctime T values (1, 2, 3, 4, 5)
    You want to emit them with offset 2 and fetch 2 (hence values 3 and 4)
    
    So you will have the onTimer function when proctime moved and you can trigger computation for time T (i.e. at T+1)
    
    The basic logic after you sort is that you go through the 5 elements and count the offset and then the fetch
    
    for(int i=0; i< inputs.size; i++) {
    offsetCounter++;
    if(offsetCounter > offset && fetchCounter<fetch) {
       out.collect(inputs(i))
       fetchCounter++;
    }
    }
    
    ...you would then update the states at the end
    What is the point here to memorize the fetchCounter and offsetCounter?
    If a failure happen meanwhile you would anyway restore the whole list of the 5 elements and restart the logic (i.e., from the beginning of the function).
    It is not like you do a state update at every iteration to pick in case of a failure from let's say line 5 when the value of counter was at a certain value


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129833609
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.Comparator
    +import java.util.ArrayList
    +import java.util.Collections
    +import org.apache.flink.api.common.typeutils.TypeComparator
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +
    +/**
    + * Process Function used for the sort based solely on proctime with offset/fetch
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param offset Is used to indicate the number of elements to be skipped in the current context
    + * (0 offset allows to execute only fetch)
    + * @param fetch Is used to indicate the number of elements to be outputted in the current context
    + * @param inputType It is used to mark the type of the incoming data
    + */
    +class ProcTimeIdentitySortProcessFunctionOffsetFetch(
    +  private val offset: Int,
    +  private val fetch: Int,
    +  private val inputRowType: CRowTypeInfo)
    +    extends ProcessFunction[CRow, CRow] {
    +
    +  private var stateEventsBuffer: ListState[Row] = _
    +  
    +  private var outputC: CRow = _
    +  private val adjustedFetchLimit = offset + fetch
    +  
    +  override def open(config: Configuration) {
    +    val sortDescriptor = new ListStateDescriptor[Row]("sortState",
    --- End diff --
    
    Instead we need two `ValueState[Long]` for `offsetCount` and `fetchCount`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129835046
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.Comparator
    +import java.util.ArrayList
    +import java.util.Collections
    +import org.apache.flink.api.common.typeutils.TypeComparator
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +
    +/**
    + * Process Function used for the sort based solely on proctime with offset/fetch
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param offset Is used to indicate the number of elements to be skipped in the current context
    + * (0 offset allows to execute only fetch)
    + * @param fetch Is used to indicate the number of elements to be outputted in the current context
    + * @param inputType It is used to mark the type of the incoming data
    + */
    +class ProcTimeIdentitySortProcessFunctionOffsetFetch(
    +  private val offset: Int,
    +  private val fetch: Int,
    +  private val inputRowType: CRowTypeInfo)
    +    extends ProcessFunction[CRow, CRow] {
    +
    +  private var stateEventsBuffer: ListState[Row] = _
    +  
    +  private var outputC: CRow = _
    --- End diff --
    
    We don't need `outputC`. We can simply forward the input `CRow`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129832039
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.Comparator
    +import java.util.ArrayList
    +import java.util.Collections
    +import org.apache.flink.api.common.typeutils.TypeComparator
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +
    +/**
    + * Process Function used for the sort based solely on proctime with offset/fetch
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param offset Is used to indicate the number of elements to be skipped in the current context
    + * (0 offset allows to execute only fetch)
    + * @param fetch Is used to indicate the number of elements to be outputted in the current context
    + * @param inputType It is used to mark the type of the incoming data
    + */
    +class ProcTimeIdentitySortProcessFunctionOffsetFetch(
    +  private val offset: Int,
    +  private val fetch: Int,
    +  private val inputRowType: CRowTypeInfo)
    +    extends ProcessFunction[CRow, CRow] {
    +
    +  private var stateEventsBuffer: ListState[Row] = _
    +  
    +  private var outputC: CRow = _
    +  private val adjustedFetchLimit = offset + fetch
    +  
    +  override def open(config: Configuration) {
    +    val sortDescriptor = new ListStateDescriptor[Row]("sortState",
    +        inputRowType.asInstanceOf[CRowTypeInfo].rowType)
    +    stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor)
    +
    +    val arity:Integer = inputRowType.getArity
    --- End diff --
    
    +space `arity: Integer`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129827640
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala ---
    @@ -75,12 +77,105 @@ object SortUtil {
         val inputCRowType = CRowTypeInfo(inputTypeInfo)
      
         new RowTimeSortProcessFunction(
    +      0,
    +      -1,
           inputCRowType,
           collectionRowComparator)
     
       }
       
       /**
    +   * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting   
    +   * with offset elements based on rowtime and potentially other fields with
    +   * @param collationSort The Sort collation list
    +   * @param sortOffset The offset indicator
    +   * @param inputType input row type
    +   * @param inputTypeInfo input type information
    +   * @param execCfg table environment execution configuration
    +   * @return org.apache.flink.streaming.api.functions.ProcessFunction
    +   */
    +  private[flink] def createRowTimeSortFunctionOffset(
    --- End diff --
    
    I think we can consolidate all sort-related methods in `SortUtil` into three methods:
    
    * `createProcTimeNoSortFunction(..., sortOffset: Option[RexNode], sortFetch: Offset[RexNode])`
    * `createProcTimeSortFunction(..., sortOffset: Option[RexNode], sortFetch: Offset[RexNode])`
    * `createRowTimeSortFunction(..., sortOffset: Option[RexNode], sortFetch: Offset[RexNode])`
    
    Each method handles all combinations of `offset` and `fetch` with two simple conditions to set the parameter to `-1`, `0`, or the actual value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129833188
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.Comparator
    +import java.util.ArrayList
    +import java.util.Collections
    +import org.apache.flink.api.common.typeutils.TypeComparator
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +
    +/**
    + * Process Function used for the sort based solely on proctime with offset/fetch
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param offset Is used to indicate the number of elements to be skipped in the current context
    + * (0 offset allows to execute only fetch)
    + * @param fetch Is used to indicate the number of elements to be outputted in the current context
    + * @param inputType It is used to mark the type of the incoming data
    + */
    +class ProcTimeIdentitySortProcessFunctionOffsetFetch(
    +  private val offset: Int,
    +  private val fetch: Int,
    +  private val inputRowType: CRowTypeInfo)
    +    extends ProcessFunction[CRow, CRow] {
    +
    +  private var stateEventsBuffer: ListState[Row] = _
    +  
    +  private var outputC: CRow = _
    +  private val adjustedFetchLimit = offset + fetch
    +  
    +  override def open(config: Configuration) {
    +    val sortDescriptor = new ListStateDescriptor[Row]("sortState",
    --- End diff --
    
    We don't need state to collect records for this operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129838980
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala ---
    @@ -108,28 +108,25 @@ class DataStreamSort(
             case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType)  =>
    --- End diff --
    
    There is a lot code duplication in this class. All `createSort` methods look basically the same and mostly differ in the `SortUtil` methods they call. I think we don't need these methods and can do everything with a few if conditions directly in the `translateToPlan()` method.
    
    Basically:
    ```
    val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
    
    val pFunc = if (FlinkTypeFactory.isProctimeIndicatorType(timeType) && sortCollation.getFieldCollations.size() == 1) {
      SortUtil.createProcTimeNoSortFunction(..., sortOffset, sortFetch)
    } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
      SortUtil.createProcTimeSortFunction(..., sortOffset, sortFetch)
    } else if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
      SortUtil.createRowTimeSortFunction(..., sortOffset, sortFetch)
    } else {
      // error
    }
    
    inputDS.keyBy(new NullByteKeySelector[CRow])
      .process(processFunction).setParallelism(1).setMaxParallelism(1)
      .returns(returnTypeInfo)
      .asInstanceOf[DataStream[CRow]]
    ```
    
    We would have to change the `IdentityCRowMap` to a ProcessFunction but that's fine. `ORDER BY proctime` is a corner case that does not add functionality and is only supported for syntactical completeness. IMO it is not worth added code complexity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/4380
  
    @fhueske
    
    Hi, I saw your remark for "If a query specifies ORDER BY ... FETCH x ROWS ONLY, then the query must emit exactly x rows (given that the result has at least x rows) and not x rows for the same sort key"
    
    To some extend we had this discussion before when we started this topic. Yes - offset/fetch refer to global/absolute values - there must be 5 elements emitted overall if we have offset 1 and fetch 5. However, as you are saying it is not particularly useful if we enable this for the stream. Take 5 elements and then do not emit anything. This was the reason why the conclusion of the initial discussions on this topic was to have the mandatory orderby time...and then basically the restrictions of offset/fetch will re-apply for each timestamp. That was also the reason why we were retracting the previously emitted results and send the updated results.
    Hence if you want to write a sql query over the stream to take the top 5 results based on some sort you would write
    
    Select * from stream1  Order by rowtime, field1 Limit 5
    ...this would  ensure that at moment 1 you get the top 5 results
    ...then at moment 2 you get an update that these 5 results are not relevant anymore (as time changed) and you get the top 5 results relevant at that moment.
    
    Please reconsider this. I think this is the behavior that we need (and the one we discussed that is compatible with SQL semantics ... and also relevant). Without this and just having a function that selects at some point x results and then does nothing (or skips y results and then nothing for the rest of the existence)...i do not see the relevance. While, with this syntax or some similar syntax...we would still need to support such functionality also via SQL. 
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4380
  
    Hi @rtudoran, thanks for updating the PR.
    
    I had a brief look at it and as I said before, I don't think we need additional ProcessFunctions for any of the `ORDER BY *time ASC OFFSET x FETCH y` cases. All cases (proctime and rowtime) can be implemented by extending the current functions for `ORDER BY`. 
    
    All we have to do is to
    - add a counter for `OFFSET` and not emit the first `x` rows (counter should be stored as state)
    - add a counter for `FETCH` and not emit more than `y` rows (counter should be stored as state).
    
    If `OFFSET` or `FETCH` are not required, we can set `x` and/or `y` to `-1` and ignore the counters.
    
    Once we have extended the current ProcessFunctions, we can also remove the additional methods in `DataStreamSort` and `SortUtil` because we always translate to the same `ProcessFunction` just with different values for `offset` and `fetch`. So we only have to extend the current methods in `DataStreamSort` and `SortUtil` to set the correct values for `offset` and `fetch` (either the values from the query or `-1` if not used) to the updated ProcessFunctions.
    
    That should simplify the PR and require much less changes and code.
    
    Let me know what you think,
    Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4380
  
    Hi @rtudoran, 
    
    I had the impression that your main motivation was to fill the syntactic gaps of SQL on streams and not so much to support certain kinds of use cases. 
    
    The semantics of a SQL query are given by the SQL specification and not up for discussion. If you want to support a certain behavior, we need to see how this can be expressed in standard-compliant SQL and then think about the implementation. The semantics of `ORDER BY rowtime ASC FETCH 10 ROWS ONLY` are given and will return the ten *first* (first because they have the lowest timestamp) result rows.
    
    However, I think what you are trying to achieve is represented by `ORDER BY rowtime DESC FETCH 10 ROWS ONLY`. This query returns the ten *last* (last because they have the highest timestamp) rows of the result. Obviously, we need retraction to handle this case, because the ten last rows will change over time. `ORDER BY rowtime DESC OFFSET 10` is also not very useful, because it holds back the 10 last result rows. However, we could support it to fill the gap in the syntax and execute the query instead of saying "Sorry, can't do that".
    
    So the question now is, how do we proceed with this PR? Do we want to add the `ORDER BY rowtime ASC OFFSET / FETCH` functionality as simple additions to the existing operators (as I said I think it can be done with very few lines if we do some refactoring) to fill the syntactic gap or not?
    
    Regardless of whether we add this or not, we should work on the `ORDER BY rowtime DESC OFFEST / FETCH` case.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/4380
  
    @fhueske ,@wuchong 
    As mentioned in PR #4263 I re-implemented the offset/fetch support for *time without retraction. You can find things in this PR. In principle things should be easy to follow as it is similar with what we already have.
    Please let me know what you think


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/4380
  
    @fhueske
    
    Thanks for the remarks/clarification. 
    
    I am fine with taking an approach to tackle both scenarios (also the simple addons you suggest but also having the actual useful code).
    Hence i would propose that the code developed with retraction support for ORDER BY rowtime ASC FETCH/OFFSET  we use it for  "ORDER BY rowtime DESC offset/FETCH.  Do you agree with this?
    
    I can add then in another PR/update also the code you intend to use for the ASC case with the simple modifications.
    
    Putting this aside there is another issue with LIMIT x. The intended behavior of LIMIT is to limit the number of results considered at a point. The way it is translated by Calcite is similar with FETCH. Basically at the moment of conversion you do not see any difference between 
    
    ORDER BY rowtime ASC FETCH 10 ROWS ONLY and ORDER BY rowtime LIMIT 10.
    How should we handle this situation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129821546
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala ---
    @@ -108,28 +108,25 @@ class DataStreamSort(
             case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType)  =>
                 (sortOffset, sortFetch) match {
    --- End diff --
    
    change `sortOffset` and `sortFetch` member fields to `Option[RexNode]` to avoid `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/4380
  
    @fhueske
    I did not understood initially that this is your suggestion. What you propose has the advantage that it is easy to maintain (as we consolidate the whole functionality) and a slight disadvantage that you have a couple of useless checks in some scenarios (e.g., you would still check the fetch condition even if you would have only offset or just the plain sorting). If this tiny performance price to pay is ok ...than clearly we can consolidate the implementation.
    
    Nevertheless, even if this is the case i would still propose we keep the ProcTimeIdentitySortProcessFunction
    This is for the scenario where you have the order by simply on the time (and no other field). In the case of simple sort we only had an identity map function to pass the events. For the case of offset/fetch we can either extend that or keep one different implementation that adds the state counters for offset/fetch. I propose we keep this later implementation.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---