You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 刘建刚 <li...@gmail.com> on 2019/09/13 08:02:09 UTC

Uncertain result when using group by in stream sql

      I use flink stream sql to write a demo about "group by".  The records
are [(bj, 1), (bj, 3), (bj, 5)]. I group by the first element and sum the
second element.
      Every time I run the program, the result is different. It seems that
the records are out of order. Even sometimes record is lost. I am confused
about that.
      The code is as below:

public class Test {
   public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
      StreamTableEnvironment tEnv =
StreamTableEnvironment.getTableEnvironment(env);

      DataStream<Tuple2<String, Long>> dataStream = env.fromElements(
            Tuple2.of("bj", 1L),
            Tuple2.of("bj", 3L),
            Tuple2.of("bj", 5L));
      tEnv.registerDataStream("person", dataStream);

      String sql = "select f0, sum(f1) from person group by f0";
      Table table = tEnv.sqlQuery(sql);
      tEnv.toRetractStream(table, Row.class).print();

      env.execute();
   }
}

      The results may be as below:
1> (true,bj,1)
1> (false,bj,1)
1> (true,bj,4)
1> (false,bj,4)
1> (true,bj,9)

1> (true,bj,5)
1> (false,bj,5)
1> (true,bj,8)
1> (false,bj,8)
1> (true,bj,9)

Re: Uncertain result when using group by in stream sql

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

A GROUP BY query on a streaming table requires that the result is
continuously updated.
Updates are propagated as a retraction stream (see
tEnv.toRetractStream(table, Row.class).print(); in your code).

A retraction stream encodes the type of the update as a boolean flag, the
"true" and "false" values in your result. "true" means the record was added
to the result, "false" means the record is removed from the result.
If you follow the output, it is the same in both cases: (bj, 9).

The different "result paths" result from the parallel (multi-threaded)
processing of the query.
If you set the parallelism to 1 ( env.setParallelism(1);) the "result path"
should be the same every time.

Best, Fabian



Am Fr., 13. Sept. 2019 um 10:02 Uhr schrieb 刘建刚 <li...@gmail.com>:

>       I use flink stream sql to write a demo about "group by".  The
> records are [(bj, 1), (bj, 3), (bj, 5)]. I group by the first element and
> sum the second element.
>       Every time I run the program, the result is different. It seems that
> the records are out of order. Even sometimes record is lost. I am confused
> about that.
>       The code is as below:
>
> public class Test {
>    public static void main(String[] args) throws Exception {
>       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>       StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
>
>       DataStream<Tuple2<String, Long>> dataStream = env.fromElements(
>             Tuple2.of("bj", 1L),
>             Tuple2.of("bj", 3L),
>             Tuple2.of("bj", 5L));
>       tEnv.registerDataStream("person", dataStream);
>
>       String sql = "select f0, sum(f1) from person group by f0";
>       Table table = tEnv.sqlQuery(sql);
>       tEnv.toRetractStream(table, Row.class).print();
>
>       env.execute();
>    }
> }
>
>       The results may be as below:
> 1> (true,bj,1)
> 1> (false,bj,1)
> 1> (true,bj,4)
> 1> (false,bj,4)
> 1> (true,bj,9)
>
> 1> (true,bj,5)
> 1> (false,bj,5)
> 1> (true,bj,8)
> 1> (false,bj,8)
> 1> (true,bj,9)
>

Re: Uncertain result when using group by in stream sql

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

A GROUP BY query on a streaming table requires that the result is
continuously updated.
Updates are propagated as a retraction stream (see
tEnv.toRetractStream(table, Row.class).print(); in your code).

A retraction stream encodes the type of the update as a boolean flag, the
"true" and "false" values in your result. "true" means the record was added
to the result, "false" means the record is removed from the result.
If you follow the output, it is the same in both cases: (bj, 9).

The different "result paths" result from the parallel (multi-threaded)
processing of the query.
If you set the parallelism to 1 ( env.setParallelism(1);) the "result path"
should be the same every time.

Best, Fabian



Am Fr., 13. Sept. 2019 um 10:02 Uhr schrieb 刘建刚 <li...@gmail.com>:

>       I use flink stream sql to write a demo about "group by".  The
> records are [(bj, 1), (bj, 3), (bj, 5)]. I group by the first element and
> sum the second element.
>       Every time I run the program, the result is different. It seems that
> the records are out of order. Even sometimes record is lost. I am confused
> about that.
>       The code is as below:
>
> public class Test {
>    public static void main(String[] args) throws Exception {
>       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>       StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
>
>       DataStream<Tuple2<String, Long>> dataStream = env.fromElements(
>             Tuple2.of("bj", 1L),
>             Tuple2.of("bj", 3L),
>             Tuple2.of("bj", 5L));
>       tEnv.registerDataStream("person", dataStream);
>
>       String sql = "select f0, sum(f1) from person group by f0";
>       Table table = tEnv.sqlQuery(sql);
>       tEnv.toRetractStream(table, Row.class).print();
>
>       env.execute();
>    }
> }
>
>       The results may be as below:
> 1> (true,bj,1)
> 1> (false,bj,1)
> 1> (true,bj,4)
> 1> (false,bj,4)
> 1> (true,bj,9)
>
> 1> (true,bj,5)
> 1> (false,bj,5)
> 1> (true,bj,8)
> 1> (false,bj,8)
> 1> (true,bj,9)
>

Re: Uncertain result when using group by in stream sql

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

A GROUP BY query on a streaming table requires that the result is
continuously updated.
Updates are propagated as a retraction stream (see
tEnv.toRetractStream(table, Row.class).print(); in your code).

A retraction stream encodes the type of the update as a boolean flag, the
"true" and "false" values in your result. "true" means the record was added
to the result, "false" means the record is removed from the result.
If you follow the output, it is the same in both cases: (bj, 9).

The different "result paths" result from the parallel (multi-threaded)
processing of the query.
If you set the parallelism to 1 ( env.setParallelism(1);) the "result path"
should be the same every time.

Best, Fabian



Am Fr., 13. Sept. 2019 um 10:02 Uhr schrieb 刘建刚 <li...@gmail.com>:

>       I use flink stream sql to write a demo about "group by".  The
> records are [(bj, 1), (bj, 3), (bj, 5)]. I group by the first element and
> sum the second element.
>       Every time I run the program, the result is different. It seems that
> the records are out of order. Even sometimes record is lost. I am confused
> about that.
>       The code is as below:
>
> public class Test {
>    public static void main(String[] args) throws Exception {
>       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>       StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
>
>       DataStream<Tuple2<String, Long>> dataStream = env.fromElements(
>             Tuple2.of("bj", 1L),
>             Tuple2.of("bj", 3L),
>             Tuple2.of("bj", 5L));
>       tEnv.registerDataStream("person", dataStream);
>
>       String sql = "select f0, sum(f1) from person group by f0";
>       Table table = tEnv.sqlQuery(sql);
>       tEnv.toRetractStream(table, Row.class).print();
>
>       env.execute();
>    }
> }
>
>       The results may be as below:
> 1> (true,bj,1)
> 1> (false,bj,1)
> 1> (true,bj,4)
> 1> (false,bj,4)
> 1> (true,bj,9)
>
> 1> (true,bj,5)
> 1> (false,bj,5)
> 1> (true,bj,8)
> 1> (false,bj,8)
> 1> (true,bj,9)
>