You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yan Chou Chen <yc...@gmail.com> on 2016/06/22 14:39:13 UTC

Stoppable Job And Web UI Questions

Several new questions:
- Stoppable job
I read threads mentioning that a streaming job can be stopped [1][2].
However looks like it can only be called through command line. Is it
possible to programmatically stop the streaming job from within the
job itself? For instance, a Kafka consumer streaming job reaches
predefined condition, then call stop() from within e.g. MapFunction?

- Web UI (jobmanager-host:8081) information
I have a Kafka consumer which reads records from Kafka. In web ui's
Subtasks tab where it has "Records sent", does it imply the records
read by consumer? For instance, I deliver say 1k string record
(SimpleStringSchema) to Kafka; can I expect 1k "Records sent"
displayed on web ui once all those records read by consumer?

This leads to another question. I have a streaming job which exploits
map function e.g. stream.map(new MyMapFunction). Within the
MyMapFunction impl I count per input and write the count to external
places. Later on I sum the count value for MyMapFunction based on
Parallelism supplied. So for example I run map(MyMapFunction) with 4
parallelism, MyMapFunction processes 400, 500, 400, 500 count
respectively. Later on the sum of all count is 1800. However this sum
value is different from web ui which has higher "Record sent" e.g. 8k.
Does that mean "Records sent" in web ui does not mean the records
processed by MyMapFunction? How do I interpret the value in this
column or how can I know if all messages delivered to Kafka are fully
processed i.e. 1k records delivered to Kafka and 1k records read out
of Kafka?

Thanks.

[1]. http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3C57155C30.8010401@apache.org%3E

[2]. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/StoppableFunction.html

Re: Stoppable Job And Web UI Questions

Posted by Till Rohrmann <tr...@apache.org>.
Hi Yan Chou Chen,

   1.

   At the moment Flink sources have to implement a certain interface,
   StoppableFunction, to be stoppable. If they do, then you can stop them
   via the CLI or the web interface. This cannot be triggered from within a
   job.

   However, you have a far better way to properly terminate a Flink job
   with your custom sources. Simply terminate the SourceFunction (leaving
   the read loop) once you’ve detected that you’ve met your termination
   criterion. Once all sources have done that, the job will be properly
   terminated and go into the state FINISHED. That has the advantage that
   you reach a consensus when to terminate. Otherwise there might be a
   dictator which orders the other tasks to stop even though they might still
   have some work left to do.
   2.

   The number of records sent is the sum of all records sent by this task.
   These records include the watermarks as well as the actual stream records
   containing your data (read from Kafka). As such, this number will always be
   an upper bound for the number of actually read records (e.g. from Kafka) by
   your source.
   3.

   Given that also others might deliver messages to the same Kafka topic
   and that you have multiple partitions, I think it is not easy to know when
   your 1000 messages have been processed.

If you’re the only one who writes to this Kafka topic, you can use an
accumulator to count the number of messages sent. The accumulator is live
updated in the web ui’s tasks overview (if you click on the job and then
the tab accumulators).

input.map(new RichMapFunction<Integer, Integer>() {
            IntCounter intCounter = null;

            @Override
            public void open(Configuration config) {
                intCounter = this.getRuntimeContext().getIntCounter("messages");
            }
            @Override
            public Integer map(Integer integer) throws Exception {
                intCounter.add(1);
                return integer;
            }
        })

Cheers,
Till
​

On Wed, Jun 22, 2016 at 4:39 PM, Yan Chou Chen <yc...@gmail.com> wrote:

> Several new questions:
> - Stoppable job
> I read threads mentioning that a streaming job can be stopped [1][2].
> However looks like it can only be called through command line. Is it
> possible to programmatically stop the streaming job from within the
> job itself? For instance, a Kafka consumer streaming job reaches
> predefined condition, then call stop() from within e.g. MapFunction?
>
> - Web UI (jobmanager-host:8081) information
> I have a Kafka consumer which reads records from Kafka. In web ui's
> Subtasks tab where it has "Records sent", does it imply the records
> read by consumer? For instance, I deliver say 1k string record
> (SimpleStringSchema) to Kafka; can I expect 1k "Records sent"
> displayed on web ui once all those records read by consumer?
>
> This leads to another question. I have a streaming job which exploits
> map function e.g. stream.map(new MyMapFunction). Within the
> MyMapFunction impl I count per input and write the count to external
> places. Later on I sum the count value for MyMapFunction based on
> Parallelism supplied. So for example I run map(MyMapFunction) with 4
> parallelism, MyMapFunction processes 400, 500, 400, 500 count
> respectively. Later on the sum of all count is 1800. However this sum
> value is different from web ui which has higher "Record sent" e.g. 8k.
> Does that mean "Records sent" in web ui does not mean the records
> processed by MyMapFunction? How do I interpret the value in this
> column or how can I know if all messages delivered to Kafka are fully
> processed i.e. 1k records delivered to Kafka and 1k records read out
> of Kafka?
>
> Thanks.
>
> [1].
> http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3C57155C30.8010401@apache.org%3E
>
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/StoppableFunction.html
>