You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijayendra Yadav <co...@gmail.com> on 2021/03/23 01:09:14 UTC

Flink Streaming Counter

Hi Team,

Could you provide a sample how to pass Flink Datastream Source and sink
results to increment COUNTER and then I want to display the Counter in
Local IDE.
Counter to display for #1 through #3.

1) DataStream<byte[]> messageStream = env.addSource(Kinesis Source);
2) DataStream<String> outputStream =
messageStream.rebalance().map(CustomMapFunction());
3) outputStream.addSink(Streaming File Sink).

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }}


Thanks,
Vijay

Re: Flink Streaming Counter

Posted by Matthias Pohl <ma...@ververica.com>.
Hi Vijayendra,
what about the example from the docs you already referred to [1]?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#counter

On Tue, Mar 23, 2021 at 6:48 PM Vijayendra Yadav <co...@gmail.com>
wrote:

> Hi Pohl,
>
> Thanks for getting back to me so quickly. I am looking for a sample
> example where I can increment counters on each stage #1 thru #3 for
> DATASTREAM.
> Then probably I can print it using slf4j.
>
> Thanks,
> Vijay
>
> On Tue, Mar 23, 2021 at 6:35 AM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> Hi Vijayendra,
>> thanks for reaching out to the Flink community. What do you mean by
>> displaying it in your local IDE? Would it be ok to log the information out
>> onto stdout? You might want to have a look at the docs about setting up a
>> slf4j metrics report [1] if that's the case.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>>
>> On Tue, Mar 23, 2021 at 2:09 AM Vijayendra Yadav <co...@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> Could you provide a sample how to pass Flink Datastream Source and sink
>>> results to increment COUNTER and then I want to display the Counter in
>>> Local IDE.
>>> Counter to display for #1 through #3.
>>>
>>> 1) DataStream<byte[]> messageStream = env.addSource(Kinesis Source);
>>> 2) DataStream<String> outputStream =
>>> messageStream.rebalance().map(CustomMapFunction());
>>> 3) outputStream.addSink(Streaming File Sink).
>>>
>>> public class MyMapper extends RichMapFunction<String, String> {
>>>   private transient Counter counter;
>>>
>>>   @Override
>>>   public void open(Configuration config) {
>>>     this.counter = getRuntimeContext()
>>>       .getMetricGroup()
>>>       .counter("myCounter");
>>>   }
>>>
>>>   @Override
>>>   public String map(String value) throws Exception {
>>>     this.counter.inc();
>>>     return value;
>>>   }}
>>>
>>>
>>> Thanks,
>>> Vijay
>>>
>>

Re: Flink Streaming Counter

Posted by Vijayendra Yadav <co...@gmail.com>.
Hi Pohl,

Thanks for getting back to me so quickly. I am looking for a sample example
where I can increment counters on each stage #1 thru #3 for DATASTREAM.
Then probably I can print it using slf4j.

Thanks,
Vijay

On Tue, Mar 23, 2021 at 6:35 AM Matthias Pohl <ma...@ververica.com>
wrote:

> Hi Vijayendra,
> thanks for reaching out to the Flink community. What do you mean by
> displaying it in your local IDE? Would it be ok to log the information out
> onto stdout? You might want to have a look at the docs about setting up a
> slf4j metrics report [1] if that's the case.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter
>
> On Tue, Mar 23, 2021 at 2:09 AM Vijayendra Yadav <co...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> Could you provide a sample how to pass Flink Datastream Source and sink
>> results to increment COUNTER and then I want to display the Counter in
>> Local IDE.
>> Counter to display for #1 through #3.
>>
>> 1) DataStream<byte[]> messageStream = env.addSource(Kinesis Source);
>> 2) DataStream<String> outputStream =
>> messageStream.rebalance().map(CustomMapFunction());
>> 3) outputStream.addSink(Streaming File Sink).
>>
>> public class MyMapper extends RichMapFunction<String, String> {
>>   private transient Counter counter;
>>
>>   @Override
>>   public void open(Configuration config) {
>>     this.counter = getRuntimeContext()
>>       .getMetricGroup()
>>       .counter("myCounter");
>>   }
>>
>>   @Override
>>   public String map(String value) throws Exception {
>>     this.counter.inc();
>>     return value;
>>   }}
>>
>>
>> Thanks,
>> Vijay
>>
>

Re: Flink Streaming Counter

Posted by Matthias Pohl <ma...@ververica.com>.
Hi Vijayendra,
thanks for reaching out to the Flink community. What do you mean by
displaying it in your local IDE? Would it be ok to log the information out
onto stdout? You might want to have a look at the docs about setting up a
slf4j metrics report [1] if that's the case.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On Tue, Mar 23, 2021 at 2:09 AM Vijayendra Yadav <co...@gmail.com>
wrote:

> Hi Team,
>
> Could you provide a sample how to pass Flink Datastream Source and sink
> results to increment COUNTER and then I want to display the Counter in
> Local IDE.
> Counter to display for #1 through #3.
>
> 1) DataStream<byte[]> messageStream = env.addSource(Kinesis Source);
> 2) DataStream<String> outputStream =
> messageStream.rebalance().map(CustomMapFunction());
> 3) outputStream.addSink(Streaming File Sink).
>
> public class MyMapper extends RichMapFunction<String, String> {
>   private transient Counter counter;
>
>   @Override
>   public void open(Configuration config) {
>     this.counter = getRuntimeContext()
>       .getMetricGroup()
>       .counter("myCounter");
>   }
>
>   @Override
>   public String map(String value) throws Exception {
>     this.counter.inc();
>     return value;
>   }}
>
>
> Thanks,
> Vijay
>