You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2018/11/19 11:31:25 UTC

Group by with null keys

Hi to all,
we wanted to do a group by on elements that can contains null values and we
discovered that Table API support this while Dataset API does not.
Is this documented somehwere on the Flink site?

Best,
Flavio

-------------------------------------------------------

PS: you can test this with the following main:

public static void main(String[] args) throws Exception {
    final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    final BatchTableEnvironment btEnv =
TableEnvironment.getTableEnvironment(env);
    final DataSet<String> testDs = env
        .fromElements("test", "test", "test2", "null", "null", "test3")
        .map(x -> "null".equals(x) ? null : x);

    boolean testDatasetApi = true;
    if (testDatasetApi) {
      testDs.groupBy(x -> x).reduceGroup(new GroupReduceFunction<String,
Integer>() {

        @Override
        public void reduce(Iterable<String> values, Collector<Integer> out)
throws Exception {
          int cnt = 0;
          for (String value : values) {
            cnt++;
          }
          out.collect(cnt);
        }
      }).print();
    }

    btEnv.registerDataSet("TEST", testDs, "field1");
    Table res = btEnv.sqlQuery("SELECT field1, count(*) as cnt FROM TEST
GROUP BY field1");
    DataSet<Row> result = btEnv.toDataSet(res,
        new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO));
    result.print();
  }

Re: Group by with null keys

Posted by Timo Walther <tw...@apache.org>.
I assigned the issue to me. Because I wanted to that for a very long 
time. I already did some prerequisite work for the documentation in 
`org.apache.flink.api.common.typeinfo.Types`.

Thanks,
Timo

Am 20.11.18 um 11:44 schrieb Flavio Pompermaier:
> Sure! The problem is that Dataset API does an implicit conversion to 
> Tuples during chaining and I didn't found any documentation about this 
> (actually I was  pleasantly surprised by the fact that the Table API 
> were supporting aggregates on null values..).
>
> Here it is: https://issues.apache.org/jira/browse/FLINK-10947
>
> Thanks for the reply,
> Flavio
>
> On Tue, Nov 20, 2018 at 11:33 AM Fabian Hueske <fhueske@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi Flavio,
>
>     Whether groupBy with null values works or not depends on the type
>     of the key, or more specifically on the TypeComparator and
>     TypeSerializer that are used to serialize, compare, and hash the
>     key type.
>     The processing engine supports null values If the comparator and
>     serializer can handle null input values.
>
>     Flink SQL wraps keys in the Row type and the corresponding
>     serializer / comparator can handle null fields.
>     If you use Row in DataSet / DataStream programs, null values are
>     supported as well.
>
>     I think it would be good to discuss the handling of null keys on
>     the documentation about data types [1] and link to that from
>     operators that require keys.
>     Would you mind creating a Jira issue for that?
>
>     Thank you,
>     Fabian
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html
>
>     Am Mo., 19. Nov. 2018 um 12:31 Uhr schrieb Flavio Pompermaier
>     <pompermaier@okkam.it <ma...@okkam.it>>:
>
>         Hi to all,
>         we wanted to do a group by on elements that can contains null
>         values and we discovered that Table API support this while
>         Dataset API does not.
>         Is this documented somehwere on the Flink site?
>
>         Best,
>         Flavio
>
>         -------------------------------------------------------
>
>         PS: you can test this with the following main:
>
>         public static void main(String[] args) throws Exception {
>             final ExecutionEnvironment env =
>         ExecutionEnvironment.getExecutionEnvironment();
>             final BatchTableEnvironment btEnv =
>         TableEnvironment.getTableEnvironment(env);
>             final DataSet<String> testDs = env
>                 .fromElements("test", "test", "test2", "null", "null",
>         "test3")
>                 .map(x -> "null".equals(x) ? null : x);
>
>             boolean testDatasetApi = true;
>             if (testDatasetApi) {
>               testDs.groupBy(x -> x).reduceGroup(new
>         GroupReduceFunction<String, Integer>() {
>
>                 @Override
>                 public void reduce(Iterable<String> values,
>         Collector<Integer> out) throws Exception {
>                   int cnt = 0;
>                   for (String value : values) {
>                     cnt++;
>                   }
>                   out.collect(cnt);
>                 }
>               }).print();
>             }
>
>             btEnv.registerDataSet("TEST", testDs, "field1");
>             Table res = btEnv.sqlQuery("SELECT field1, count(*) as cnt
>         FROM TEST GROUP BY field1");
>             DataSet<Row> result = btEnv.toDataSet(res,
>                 new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>         BasicTypeInfo.LONG_TYPE_INFO));
>             result.print();
>           }
>
>
>


Re: Group by with null keys

Posted by Flavio Pompermaier <po...@okkam.it>.
Sure! The problem is that Dataset API does an implicit conversion to Tuples
during chaining and I didn't found any documentation about this (actually I
was  pleasantly surprised by the fact that the Table API were supporting
aggregates on null values..).

Here it is: https://issues.apache.org/jira/browse/FLINK-10947

Thanks for the reply,
Flavio

On Tue, Nov 20, 2018 at 11:33 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Flavio,
>
> Whether groupBy with null values works or not depends on the type of the
> key, or more specifically on the TypeComparator and TypeSerializer that are
> used to serialize, compare, and hash the key type.
> The processing engine supports null values If the comparator and
> serializer can handle null input values.
>
> Flink SQL wraps keys in the Row type and the corresponding serializer /
> comparator can handle null fields.
> If you use Row in DataSet / DataStream programs, null values are supported
> as well.
>
> I think it would be good to discuss the handling of null keys on the
> documentation about data types [1] and link to that from operators that
> require keys.
> Would you mind creating a Jira issue for that?
>
> Thank you,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html
>
> Am Mo., 19. Nov. 2018 um 12:31 Uhr schrieb Flavio Pompermaier <
> pompermaier@okkam.it>:
>
>> Hi to all,
>> we wanted to do a group by on elements that can contains null values and
>> we discovered that Table API support this while Dataset API does not.
>> Is this documented somehwere on the Flink site?
>>
>> Best,
>> Flavio
>>
>> -------------------------------------------------------
>>
>> PS: you can test this with the following main:
>>
>> public static void main(String[] args) throws Exception {
>>     final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>     final BatchTableEnvironment btEnv =
>> TableEnvironment.getTableEnvironment(env);
>>     final DataSet<String> testDs = env
>>         .fromElements("test", "test", "test2", "null", "null", "test3")
>>         .map(x -> "null".equals(x) ? null : x);
>>
>>     boolean testDatasetApi = true;
>>     if (testDatasetApi) {
>>       testDs.groupBy(x -> x).reduceGroup(new GroupReduceFunction<String,
>> Integer>() {
>>
>>         @Override
>>         public void reduce(Iterable<String> values, Collector<Integer>
>> out) throws Exception {
>>           int cnt = 0;
>>           for (String value : values) {
>>             cnt++;
>>           }
>>           out.collect(cnt);
>>         }
>>       }).print();
>>     }
>>
>>     btEnv.registerDataSet("TEST", testDs, "field1");
>>     Table res = btEnv.sqlQuery("SELECT field1, count(*) as cnt FROM TEST
>> GROUP BY field1");
>>     DataSet<Row> result = btEnv.toDataSet(res,
>>         new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.LONG_TYPE_INFO));
>>     result.print();
>>   }
>>
>

Re: Group by with null keys

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Flavio,

Whether groupBy with null values works or not depends on the type of the
key, or more specifically on the TypeComparator and TypeSerializer that are
used to serialize, compare, and hash the key type.
The processing engine supports null values If the comparator and serializer
can handle null input values.

Flink SQL wraps keys in the Row type and the corresponding serializer /
comparator can handle null fields.
If you use Row in DataSet / DataStream programs, null values are supported
as well.

I think it would be good to discuss the handling of null keys on the
documentation about data types [1] and link to that from operators that
require keys.
Would you mind creating a Jira issue for that?

Thank you,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html

Am Mo., 19. Nov. 2018 um 12:31 Uhr schrieb Flavio Pompermaier <
pompermaier@okkam.it>:

> Hi to all,
> we wanted to do a group by on elements that can contains null values and
> we discovered that Table API support this while Dataset API does not.
> Is this documented somehwere on the Flink site?
>
> Best,
> Flavio
>
> -------------------------------------------------------
>
> PS: you can test this with the following main:
>
> public static void main(String[] args) throws Exception {
>     final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>     final BatchTableEnvironment btEnv =
> TableEnvironment.getTableEnvironment(env);
>     final DataSet<String> testDs = env
>         .fromElements("test", "test", "test2", "null", "null", "test3")
>         .map(x -> "null".equals(x) ? null : x);
>
>     boolean testDatasetApi = true;
>     if (testDatasetApi) {
>       testDs.groupBy(x -> x).reduceGroup(new GroupReduceFunction<String,
> Integer>() {
>
>         @Override
>         public void reduce(Iterable<String> values, Collector<Integer>
> out) throws Exception {
>           int cnt = 0;
>           for (String value : values) {
>             cnt++;
>           }
>           out.collect(cnt);
>         }
>       }).print();
>     }
>
>     btEnv.registerDataSet("TEST", testDs, "field1");
>     Table res = btEnv.sqlQuery("SELECT field1, count(*) as cnt FROM TEST
> GROUP BY field1");
>     DataSet<Row> result = btEnv.toDataSet(res,
>         new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.LONG_TYPE_INFO));
>     result.print();
>   }
>