You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gallenvara <gi...@git.apache.org> on 2016/05/30 10:42:46 UTC

[GitHub] flink pull request: [FLINK-3971] [tableAPI] Aggregates handle null...

GitHub user gallenvara opened a pull request:

    https://github.com/apache/flink/pull/2049

    [FLINK-3971] [tableAPI] Aggregates handle null values incorrectly.

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [X] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [X] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    
    Null-values-only aggregation handling in Max/Min/Average/Sum.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gallenvara/flink flink-3971

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2049.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2049
    
----
commit cb39f098e5cea0867fc63c8661257cc9664a4882
Author: gallenvara <ga...@126.com>
Date:   2016-05-30T10:30:07Z

    Null-values-only aggregation handling.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2049: [FLINK-3971] [tableAPI] Aggregates handle null val...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2049#discussion_r66638346
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala ---
    @@ -48,8 +48,16 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
       override def merge(intermediate: Row, buffer: Row): Unit = {
         val partialValue = intermediate.productElement(maxIndex).asInstanceOf[T]
         val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T]
    -    val max: T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
    -    buffer.setField(maxIndex, max)
    +    if (partialValue == null && bufferValue == null) {
    --- End diff --
    
    Can we reorder the `if else` cases to move the common case of `partialValue != null && bufferValue != null` to the top?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3971] [tableAPI] Aggregates handle null...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/2049#issuecomment-222633219
  
    @fhueske I have changed the return type to generic `T` in `AvgAggregation` class because when return `null.asInstanceof[Short/Int/..]` , the value can convert to `0`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2049: [FLINK-3971] [tableAPI] Aggregates handle null val...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2049#discussion_r66765029
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala ---
    @@ -48,8 +57,16 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
       override def merge(intermediate: Row, buffer: Row): Unit = {
    --- End diff --
    
    Yes, you are right. The cases where `partialValue == null` should be ignored directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2049: [FLINK-3971] [tableAPI] Aggregates handle null val...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2049#discussion_r66638439
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala ---
    @@ -48,8 +48,16 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
       override def merge(intermediate: Row, buffer: Row): Unit = {
         val partialValue = intermediate.productElement(maxIndex).asInstanceOf[T]
         val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T]
    -    val max: T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
    -    buffer.setField(maxIndex, max)
    +    if (partialValue == null && bufferValue == null) {
    +      buffer.setField(maxIndex, null.asInstanceOf[T])
    --- End diff --
    
    When setting a field in a `Row` to `null` we do not need to cast.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the issue:

    https://github.com/apache/flink/pull/2049
  
    @fhueske sorry for the late update. Codes have been modified based on your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the issue:

    https://github.com/apache/flink/pull/2049
  
    @fhueske fixed! :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2049: [FLINK-3971] [tableAPI] Aggregates handle null val...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2049#discussion_r66638477
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala ---
    @@ -74,7 +82,7 @@ class ByteMaxAggregate extends MaxAggregate[Byte] {
       override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
     
       override def initiate(intermediate: Row): Unit = {
    -    intermediate.setField(maxIndex, Byte.MinValue)
    +    intermediate.setField(maxIndex, null.asInstanceOf[Byte])
    --- End diff --
    
    No cast necessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2049: [FLINK-3971] [tableAPI] Aggregates handle null val...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2049


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2049
  
    Hi @gallenvara, thanks for the update! Looks good. After my last comment is fix, the PR should be good to merge. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the issue:

    https://github.com/apache/flink/pull/2049
  
    @fhueske Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2049: [FLINK-3971] [tableAPI] Aggregates handle null val...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2049#discussion_r66638676
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala ---
    @@ -48,8 +48,16 @@ abstract  class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T]{
       override def merge(partial: Row, buffer: Row): Unit = {
    --- End diff --
    
    all comments on `MaxAggregate` apply here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2049
  
    Hi @gallenvara, thanks for the PR. 
    I added a few comments inline. Can you in addition extend the `CountAggregateTest` with an all-null case? 
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2049
  
    Thanks for the fast update @gallenvara. Good to merge!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2049
  
    Thank you @gallenvara. Will merge this PR later today. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2049: [FLINK-3971] [tableAPI] Aggregates handle null val...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2049#discussion_r66762005
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala ---
    @@ -48,8 +57,16 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
       override def merge(intermediate: Row, buffer: Row): Unit = {
    --- End diff --
    
    I think we can make this method a bit more efficient:
    ```
    val partialValue = ...
    if (partialValue != null) {
      val bufferValue = ...
      if (bufferValue != null) {
        val max = ...
        buffer.setField(maxIndex, max)
      } else {
        buffer.setField(maxIndex, partialValue)
      }
    }
    ```
    
    Same applies for `MinAggregate` and `SumAggregate`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2049
  
    Thanks for the PR @gallenvara. I'm currently on vacation with limited access to internet. Will review your PR when I am back in roughly a week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2049
  
    Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2049: [FLINK-3971] [tableAPI] Aggregates handle null val...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2049#discussion_r66638771
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala ---
    @@ -27,13 +27,21 @@ abstract class SumAggregate[T: Numeric]
       protected var sumIndex: Int = _
     
       override def initiate(partial: Row): Unit = {
    -    partial.setField(sumIndex, numeric.zero)
    +    partial.setField(sumIndex, null.asInstanceOf[T])
    --- End diff --
    
    No cast necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2049: [FLINK-3971] [tableAPI] Aggregates handle null val...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2049#discussion_r66638756
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala ---
    @@ -27,13 +27,21 @@ abstract class SumAggregate[T: Numeric]
       protected var sumIndex: Int = _
     
       override def initiate(partial: Row): Unit = {
    -    partial.setField(sumIndex, numeric.zero)
    +    partial.setField(sumIndex, null.asInstanceOf[T])
       }
     
       override def merge(partial1: Row, buffer: Row): Unit = {
         val partialValue = partial1.productElement(sumIndex).asInstanceOf[T]
         val bufferValue = buffer.productElement(sumIndex).asInstanceOf[T]
    -    buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue))
    +    if (partialValue == null && bufferValue == null) {
    --- End diff --
    
    Please reorder `if else` cases as in `MaxAggregate`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2049: [FLINK-3971] [tableAPI] Aggregates handle null val...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2049#discussion_r66638155
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala ---
    @@ -66,51 +66,63 @@ abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] {
       def doPrepare(value: Any, partial: Row): Unit
     }
     
    -class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
    +class ByteAvgAggregate[T] extends IntegralAvgAggregate[T] {
    --- End diff --
    
    I think it would be cleaner to keep the `Byte` type parameter.
    How about we add an abstract method `def doEvaluate(buffer: Row): Any` to `IntegralAvgAggregate`. 
    The subclasses of `IntegralAvgAggregate` implement `doEvaluate` like `ByteAvgAggregate`:
    
    ```
    override def doEvaluate(buffer: Row): Any = {
      val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
      val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
      if (bufferCount == 0L) {
        null
      } else {
        (bufferSum / bufferCount).toByte
      }
    }
    ```
    
    and return an `Any` which is casted to `T` by `IntegralAvgAggregate.evaluate()` as follows:
    
    
    ```
    override def evaluate(buffer: Row): T = {
      doEvaluate(buffer).asInstanceOf[T]
    }
    ```
    
    Same for the `FloatingAvgAggregate` and its subclasses.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2049: [FLINK-3971] [tableAPI] Aggregates handle null values inc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2049
  
    Hi @gallenvara, you have a merge commit in your changes.
    Can you rebase your changes to the current master and squash the commits? 
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---