You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jiewen Shao <ji...@gmail.com> on 2017/09/18 02:27:28 UTC

need instruction on how the Flink metric works

I'm new to flink and I have read
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html,
I am still unclear where do I read the metrics I added.

for example,

public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);



        List<String> wordList = Arrays.asList("Hive", "Presto", "Impala",
"Parquet","ORC","Hadoop", "Flink", "Spark", "Storm", "Tez", "Flink");


        DataStreamSource<String> source = env.fromCollection(wordList);

        DataStream<Tuple2<String, Integer>> dataStream = env.fromCollection(
wordList).map(new WordLengthCounter());

        dataStream.print();

        env.execute();

    }


and


public class WordLengthCounter extends RichMapFunction<String,
Tuple2<String, Integer>> {


private static final long serialVersionUID = 1L;

private Counter counter;


@Override

  public void open(Configuration config) {

    this.counter = getRuntimeContext()

      .getMetricGroup()

      .counter("myCounter");

  }


@Override

public Tuple2<String, Integer> map(String value) throws Exception {

this.counter.inc();

return new Tuple2<String, Integer>(value, value.length());

}

}


Now, where do I see the counter? Sorry for the naive question

can anyone point me to any good end-to-end "hello world" example for flink
metrics.

Re: need instruction on how the Flink metric works

Posted by Jiewen Shao <ji...@gmail.com>.
hi, Michael & Chesnay,
Thanks a lot for the help, solved!

On Fri, Sep 22, 2017 at 6:58 AM, Chesnay Schepler <ch...@apache.org>
wrote:

> Hello Jiewen,
>
> you are connection to the wrong JVM. Metrics are exposed on each Job- and
> TaskManager separately, meaning you have to connect to the TaskManager JVM
> instead of the JobManager JVM.
>
>
> On 22.09.2017 03:45, Michael Fong wrote:
>
> Hi, Jiewen,
>
>
> Sorry, I am not familiar with Flink v1.2 nor Mac env. I did a brief tests
> with Flink v1.2.1 (on Linux) with the examples you provided (counters
> only), the metric shows up successfully and increments as typing more words
> to console. Please check the following:
> 1. connect to the corresponding JMX server
> 2. Task is up and running
>
> [image: Inline image 1]
> Here is the code snippet FYI.
>
> class MetricFunction<T> extends RichMapFunction<T, T> {
>
>     private Counter counter;
>
>        @Override       public void open(Configuration config) {
>            this.counter = getRuntimeContext()
>                    .getMetricGroup()
>                    .counter("my-counter");
>
>        }
>
>        @Override       public T map(T item) throws Exception {
>            this.counter.inc();
>            return item;
>        }
>    }
>
> DataStream<WordCount> result = text.map(new MetricFunction<>())
>
>       .flatMap(
>
>        ..... //test
>
>
> Hope these would help,
>
> Regards,
>
> On Thu, Sep 21, 2017 at 3:35 AM, Jiewen Shao <ji...@gmail.com> wrote:
>
>> thats weird, I am still having trouble to see my custom metrics "my-counter"
>> and "my-meter", I was able to see the default system metrics.
>> for example I have env.execute("Hello Flink"); when I connect to
>> localhost:28888 (28888 is the port JMX listens to) I can see default
>> flink system metrics (Hello_Flink), but just didn't see my custom metrics,
>> I could miss something obvious. (btw, I used flink 1.2 on macbook, I
>> started flink using start-cluster.sh), thanks!
>>
>> [image: Inline image 1]
>>
>> On Tue, Sep 19, 2017 at 7:07 PM, Michael Fong <mc...@gmail.com>
>> wrote:
>>
>>> I just did the same test as you had with SocketWindowWordCount, and the
>>> counter showed up all right.
>>>
>>> You should probably connect Jconsole to localhost:28781 (or whatever
>>> port you have your JMX server listened on)
>>>
>>> That's how I setup the env, perhaps there is other better ways to do it.
>>>
>>> On Wed, Sep 20, 2017 at 9:15 AM, Jiewen Shao <ji...@gmail.com>
>>> wrote:
>>>
>>>> Still got stuck, here are my steps (on my laptop)
>>>>
>>>> for example:
>>>> Step1:
>>>>
>>>> public class MetricsTest<T> extends RichMapFunction<T, T> {
>>>>
>>>>
>>>> private static final long serialVersionUID = 1L;
>>>>
>>>> private org.apache.flink.metrics.Meter meter;
>>>>
>>>>     private Counter counter;
>>>>
>>>>
>>>>     @Override
>>>>
>>>>     public void open(Configuration config) {
>>>>
>>>>         this.counter = getRuntimeContext()
>>>>
>>>>                 .getMetricGroup()
>>>>
>>>>                 .counter("my-counter");
>>>>
>>>>
>>>>
>>>>         this.meter = getRuntimeContext()
>>>>
>>>>                 .getMetricGroup()
>>>>
>>>>                 .meter("my-meter", new DropwizardMeterWrapper(new
>>>>  com.codahale.metrics.Meter()));
>>>>
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>
>>>>     public T map(T item) throws Exception {
>>>>
>>>>         this.counter.inc();
>>>>
>>>>         this.meter.markEvent();
>>>>
>>>>         return item;
>>>>
>>>>     }
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> And I did followings in one of the Flink sample
>>>> (SocketWindowWordCount.java):
>>>> Step2:
>>>>
>>>> DataStream<String> text = env.socketTextStream("localhost", 12345, "\n"
>>>> );
>>>>
>>>> text.map(new MetricsTest());  //<-- added this line
>>>>
>>>>
>>>> Step3:
>>>>
>>>> mvn clean install
>>>>
>>>>
>>>> step4: nc -l 12345
>>>>
>>>>
>>>> step5:
>>>>
>>>> flink run -c [my_class_name] my.jar
>>>>
>>>>
>>>> step6:  (type something under nc terminal)
>>>>
>>>> run jconsole, and pick the local process for this "flink run", and
>>>> click the tab "MBeans" (I don't see my metrics other than system ones, is
>>>> that the right place to look at?)
>>>>
>>>>
>>>> and flink-conf.yaml has:
>>>>
>>>> # metrics
>>>>
>>>> metrics.reporters: jmx
>>>>
>>>> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>>>>
>>>> metrics.reporter.jmx.port: 28780-28790
>>>>
>>>>
>>>> and taskmanager log looks ok regarding JMX
>>>>
>>>>
>>>> did I miss steps or configurations? Thanks a lot!
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Sep 18, 2017 at 12:30 AM, Michael Fong <mc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> There are several possibilities:
>>>>> 1. Please check if reporter is set up ( guide
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#jmx-orgapacheflinkmetricsjmxjmxreporter>
>>>>>  )
>>>>> For example, I would make sure my local JMXReporter service is up and
>>>>> running by checking taskmanager.log and search for the line:
>>>>>
>>>>> 2017-09-18 15:18:57,174 INFO  org.apache.flink.metrics.jmx.JMXReporter
>>>>>                     - Started JMX server on port 28781.
>>>>> 2017-09-18 15:18:57,175 INFO  org.apache.flink.metrics.jmx.JMXReporter
>>>>>                     - Configured JMXReporter with {port:28780-28790}
>>>>>
>>>>> If for any reason the JMX server does not start up, your might see
>>>>> some errors:
>>>>>
>>>>> 2017-09-18 15:26:04,743 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>>>>>              - Configuring JMXReporter with {port=28781, class=org.apac
>>>>> he.flink.metrics.jmx.JMXReporter}.
>>>>> 2017-09-18 15:26:04,760 ERROR org.apache.flink.runtime.metrics.MetricRegistry
>>>>>              - Could not instantiate metrics reporter jmx. Metrics migh
>>>>> t not be exposed/reported.
>>>>> java.lang.RuntimeException: Could not start JMX server on any
>>>>> configured port. Ports: 28781
>>>>>         at org.apache.flink.metrics.jmx.J
>>>>> MXReporter.open(JMXReporter.java:126)
>>>>>         at org.apache.flink.runtime.metri
>>>>> cs.MetricRegistry.<init>(MetricRegistry.java:131)
>>>>>         at org.apache.flink.runtime.taske
>>>>> xecutor.TaskManagerServices.fromConfiguration(TaskManagerSer
>>>>> vices.java:188)
>>>>>         at org.apache.flink.runtime.taskm
>>>>> anager.TaskManager$.startTaskManagerComponentsAndActor(TaskM
>>>>> anager.scala:1984)
>>>>>         at org.apache.flink.runtime.taskm
>>>>> anager.TaskManager$.runTaskManager(TaskManager.scala:1823)
>>>>>         at org.apache.flink.runtime.taskm
>>>>> anager.TaskManager$$anonfun$1.apply$mcV$sp(TaskManager.scala:1926)
>>>>>         at org.apache.flink.runtime.taskm
>>>>> anager.TaskManager$$anonfun$1.apply(TaskManager.scala:1904)
>>>>>         at org.apache.flink.runtime.taskm
>>>>> anager.TaskManager$$anonfun$1.apply(TaskManager.scala:1904)
>>>>>         at scala.util.Try$.apply(Try.scala:192)
>>>>>
>>>>>
>>>>> Here is my local setup for conf/flink-conf.yaml for example:
>>>>> metrics.reporters: jmx
>>>>> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>>>>> metrics.reporter.jmx.port: 28780-28790
>>>>>
>>>>> 2. You might want to try a real streaming example which could execute
>>>>> continuously. If I remember correctly, when the task is completed, the
>>>>> manager would seem to release the associated resource and object. In your
>>>>> example, it is only processing a few strings, which would finish in matter
>>>>> of milliseconds, before bringing up jconsole manually.
>>>>>
>>>>> Hope some of these help,
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Sep 18, 2017 at 12:22 PM, Jiewen Shao <ji...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks, When I started jconsole, it listed com.apache.flink.runtime.
>>>>>> jobmanager..:[port] as one of the Local Process, i was able to
>>>>>> connect to it with insecure connection, but i was not able to locate the
>>>>>> Counter metrics, I only saw some system metrics.
>>>>>>
>>>>>> On Sun, Sep 17, 2017 at 7:39 PM, Michael Fong <mc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> You may enable metrics reporter to see the output of your metrics;
>>>>>>> counter in your example.
>>>>>>>
>>>>>>> There is a brief documentation regarding to metrics and reporter
>>>>>>> setup at link
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html>.
>>>>>>> The easiest approach, in my opinion, is to set up a JMX reporter so that
>>>>>>> you may see your metrics via JConsole.
>>>>>>>
>>>>>>> Hope this helps.
>>>>>>>
>>>>>>> Regrads,
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 18, 2017 at 10:27 AM, Jiewen Shao <ji...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'm new to flink and I have read https://ci.apache.org/pro
>>>>>>>> jects/flink/flink-docs-release-1.3/monitoring/metrics.html, I am
>>>>>>>> still unclear where do I read the metrics I added.
>>>>>>>>
>>>>>>>> for example,
>>>>>>>>
>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>
>>>>>>>>
>>>>>>>>         StreamExecutionEnvironment env =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>
>>>>>>>>         env.setParallelism(2);
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>         List<String> wordList = Arrays.asList("Hive", "Presto",
>>>>>>>> "Impala", "Parquet","ORC","Hadoop", "Flink", "Spark", "Storm",
>>>>>>>> "Tez", "Flink");
>>>>>>>>
>>>>>>>>         DataStreamSource<String> source = env.fromCollection(
>>>>>>>> wordList);
>>>>>>>>
>>>>>>>>         DataStream<Tuple2<String, Integer>> dataStream = env
>>>>>>>> .fromCollection(wordList).map(new WordLengthCounter());
>>>>>>>>
>>>>>>>>         dataStream.print();
>>>>>>>>
>>>>>>>>         env.execute();
>>>>>>>>
>>>>>>>>     }
>>>>>>>>
>>>>>>>>
>>>>>>>> and
>>>>>>>>
>>>>>>>>
>>>>>>>> public class WordLengthCounter extends RichMapFunction<String,
>>>>>>>> Tuple2<String, Integer>> {
>>>>>>>>
>>>>>>>>
>>>>>>>> private static final long serialVersionUID = 1L;
>>>>>>>>
>>>>>>>> private Counter counter;
>>>>>>>>
>>>>>>>>
>>>>>>>> @Override
>>>>>>>>
>>>>>>>>   public void open(Configuration config) {
>>>>>>>>
>>>>>>>>     this.counter = getRuntimeContext()
>>>>>>>>
>>>>>>>>       .getMetricGroup()
>>>>>>>>
>>>>>>>>       .counter("myCounter");
>>>>>>>>
>>>>>>>>   }
>>>>>>>>
>>>>>>>>
>>>>>>>> @Override
>>>>>>>>
>>>>>>>> public Tuple2<String, Integer> map(String value) throws Exception {
>>>>>>>>
>>>>>>>> this.counter.inc();
>>>>>>>>
>>>>>>>> return new Tuple2<String, Integer>(value, value.length());
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> Now, where do I see the counter? Sorry for the naive question
>>>>>>>>
>>>>>>>> can anyone point me to any good end-to-end "hello world" example
>>>>>>>> for flink metrics.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>

Re: need instruction on how the Flink metric works

Posted by Chesnay Schepler <ch...@apache.org>.
Hello Jiewen,

you are connection to the wrong JVM. Metrics are exposed on each Job- 
and TaskManager separately, meaning you have to connect to the 
TaskManager JVM instead of the JobManager JVM.

On 22.09.2017 03:45, Michael Fong wrote:
> Hi, Jiewen,
>
>
> Sorry, I am not familiar with Flink v1.2 nor Mac env. I did a brief 
> tests with Flink v1.2.1 (on Linux) with the examples you provided 
> (counters only), the metric shows up successfully and increments as 
> typing more words to console. Please check the following:
> 1. connect to the corresponding JMX server
> 2. Task is up and running
>
> Inline image 1
> Here is the code snippet FYI.
> class MetricFunction<T> extends RichMapFunction<T,T> {
>
>      private Countercounter;
>
>         @Override public void open(Configuration config) {
>             this.counter = getRuntimeContext()
>                     .getMetricGroup()
>                     .counter("my-counter");
>
>         }
>
>         @Override public T map(T item)throws Exception {
>             this.counter.inc();
>             return item;
>         }
>     }
> DataStream<WordCount> result = text.map(new MetricFunction<>())
>
>        .flatMap(
>         ..... //test
>
> Hope these would help,
>
> Regards,
>
> On Thu, Sep 21, 2017 at 3:35 AM, Jiewen Shao <jiewenshao@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     thats weird, I am still having trouble to see my custom metrics
>     "my-counter" and "my-meter", I was able to see the default system
>     metrics.
>     for example I have env.execute("Hello Flink"); when I connect to
>     localhost:28888 (28888 is the port JMX listens to) I can see
>     default flink system metrics (Hello_Flink), but just didn't see my
>     custom metrics, I could miss something obvious. (btw, I used flink
>     1.2 on macbook, I started flink using start-cluster.sh), thanks!
>
>     Inline image 1
>
>     On Tue, Sep 19, 2017 at 7:07 PM, Michael Fong
>     <mcfong.open@gmail.com <ma...@gmail.com>> wrote:
>
>         I just did the same test as you had with
>         SocketWindowWordCount, and the counter showed up all right.
>
>         You should probably connect Jconsole to localhost:28781 (or
>         whatever port you have your JMX server listened on)
>
>         That's how I setup the env, perhaps there is other better ways
>         to do it.
>
>         On Wed, Sep 20, 2017 at 9:15 AM, Jiewen Shao
>         <jiewenshao@gmail.com <ma...@gmail.com>> wrote:
>
>             Still got stuck, here are my steps (on my laptop)
>
>             for example:
>             Step1:
>
>             public class MetricsTest<T> extends RichMapFunction<T, T> {
>
>
>             privatestaticfinallongserialVersionUID = 1L;
>
>             private org.apache.flink.metrics.Meter meter;
>
>             private Counter counter;
>
>
>             @Override
>
>             public void open(Configuration config) {
>
>             this.counter = getRuntimeContext()
>
>                           .getMetricGroup()
>
>                           .counter("my-counter");
>
>             this.meter = getRuntimeContext()
>
>                           .getMetricGroup()
>
>                           .meter("my-meter",
>             new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
>
>               }
>
>
>             @Override
>
>             public T map(T item) throws Exception {
>
>             this.counter.inc();
>
>             this.meter.markEvent();
>
>             return item;
>
>               }
>
>             }
>
>
>
>
>             And I did followings in one of the Flink sample
>             (SocketWindowWordCount.java):
>             Step2:
>
>             DataStream<String> text =
>             env.socketTextStream("localhost", 12345, "\n");
>
>             text.map(new MetricsTest());  //<-- added this line
>
>
>             Step3:
>
>             mvn clean install
>
>
>             step4: nc -l 12345
>
>
>             step5:
>
>             flink run -c [my_class_name] my.jar
>
>
>             step6:  (type something under nc terminal)
>
>             run jconsole, and pick the local process for this "flink
>             run", and click the tab "MBeans" (I don't see my metrics
>             other than system ones, is that the right place to look at?)
>
>
>             and flink-conf.yaml has:
>
>             # metrics
>
>             metrics.reporters: jmx
>
>             metrics.reporter.jmx.class:
>             org.apache.flink.metrics.jmx.JMXReporter
>
>             metrics.reporter.jmx.port: 28780-28790
>
>
>             and taskmanager log looks ok regarding JMX
>
>
>             did I miss steps or configurations? Thanks a lot!
>
>
>
>
>             On Mon, Sep 18, 2017 at 12:30 AM, Michael Fong
>             <mcfong.open@gmail.com <ma...@gmail.com>> wrote:
>
>                 Hi,
>
>                 There are several possibilities:
>                 1. Please check if reporter is set up ( guide
>                 <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#jmx-orgapacheflinkmetricsjmxjmxreporter> )
>                 For example, I would make sure my local JMXReporter
>                 service is up and running by checking taskmanager.log
>                 and search for the line:
>
>                 2017-09-18 15:18:57,174 INFO
>                 org.apache.flink.metrics.jmx.JMXReporter              
>                       - Started JMX server on port 28781.
>                 2017-09-18 15:18:57,175 INFO
>                 org.apache.flink.metrics.jmx.JMXReporter              
>                       - Configured JMXReporter with {port:28780-28790}
>
>                 If for any reason the JMX server does not start up,
>                 your might see some errors:
>
>                 2017-09-18 15:26:04,743 INFO
>                 org.apache.flink.runtime.metrics.MetricRegistry      
>                        - Configuring JMXReporter with {port=28781,
>                 class=org.apac
>                 he.flink.metrics.jmx.JMXReporter}.
>                 2017-09-18 15:26:04,760 ERROR
>                 org.apache.flink.runtime.metrics.MetricRegistry      
>                        - Could not instantiate metrics reporter jmx.
>                 Metrics migh
>                 t not be exposed/reported.
>                 java.lang.RuntimeException: Could not start JMX server
>                 on any configured port. Ports: 28781
>                         at
>                 org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.java:126)
>                         at
>                 org.apache.flink.runtime.metrics.MetricRegistry.<init>(MetricRegistry.java:131)
>                         at
>                 org.apache.flink.runtime.taskexecutor.TaskManagerServices.fr
>                 <http://xecutor.TaskManagerServices.fr>omConfiguration(TaskManagerServices.java:188)
>                         at
>                 org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1984)
>                         at
>                 org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1823)
>                         at
>                 org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply$mcV$sp(TaskManager.scala:1926)
>                         at
>                 org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1904)
>                         at
>                 org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1904)
>                         at scala.util.Try$.apply(Try.scala:192)
>
>
>                 Here is my local setup for conf/flink-conf.yaml for
>                 example:
>                 metrics.reporters: jmx
>                 metrics.reporter.jmx.class:
>                 org.apache.flink.metrics.jmx.JMXReporter
>                 metrics.reporter.jmx.port: 28780-28790
>
>                 2. You might want to try a real streaming example
>                 which could execute continuously. If I remember
>                 correctly, when the task is completed, the manager
>                 would seem to release the associated resource and
>                 object. In your example, it is only processing a few
>                 strings, which would finish in matter of milliseconds,
>                 before bringing up jconsole manually.
>
>                 Hope some of these help,
>
>
>
>                 On Mon, Sep 18, 2017 at 12:22 PM, Jiewen Shao
>                 <jiewenshao@gmail.com <ma...@gmail.com>>
>                 wrote:
>
>                     Thanks, When I started jconsole, it listed
>                     com.apache.flink.runtime.jobmanager..:[port] as
>                     one of the Local Process, i was able to connect to
>                     it with insecure connection, but i was not able to
>                     locate the Counter metrics, I only saw some system
>                     metrics.
>
>                     On Sun, Sep 17, 2017 at 7:39 PM, Michael Fong
>                     <mcfong.open@gmail.com
>                     <ma...@gmail.com>> wrote:
>
>                         Hi,
>
>                         You may enable metrics reporter to see the
>                         output of your metrics; counter in your example.
>
>                         There is a brief documentation regarding to
>                         metrics and reporter setup at link
>                         <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html>.
>                         The easiest approach, in my opinion, is to set
>                         up a JMX reporter so that you may see your
>                         metrics via JConsole.
>
>                         Hope this helps.
>
>                         Regrads,
>
>
>                         On Mon, Sep 18, 2017 at 10:27 AM, Jiewen Shao
>                         <jiewenshao@gmail.com
>                         <ma...@gmail.com>> wrote:
>
>                             I'm new to flink and I have read
>                             https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html
>                             <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html>,
>                             I am still unclear where do I read the
>                             metrics I added.
>
>                             for example,
>
>                             public static void main(String[] args)
>                             throws Exception {
>
>
>                             StreamExecutionEnvironment env =
>                             StreamExecutionEnvironment.getExecutionEnvironment();
>
>                             env.setParallelism(2);
>
>                             List<String> wordList=
>                             Arrays.asList("Hive", "Presto", "Impala",
>                             "Parquet","ORC","Hadoop", "Flink",
>                             "Spark", "Storm", "Tez", "Flink");
>
>                             DataStreamSource<String> source =
>                             env.fromCollection(wordList);
>
>                             DataStream<Tuple2<String, Integer>>
>                             dataStream =
>                             env.fromCollection(wordList).map(new
>                             WordLengthCounter());
>
>                             dataStream.print();
>
>                             env.execute();
>
>                               }
>
>
>                             and
>
>
>                             public class WordLengthCounter extends
>                             RichMapFunction<String, Tuple2<String,
>                             Integer>> {
>
>
>                             privatestaticfinallongserialVersionUID= 1L;
>
>                             private Counter counter;
>
>
>                             @Override
>
>                             public void open(Configuration config) {
>
>                             this.counter = getRuntimeContext()
>
>                             .getMetricGroup()
>
>                                 .counter("myCounter");
>
>                             }
>
>
>                             @Override
>
>                             public Tuple2<String, Integer> map(String
>                             value) throws Exception {
>
>                             this.counter.inc();
>
>                             return new Tuple2<String, Integer>(value,
>                             value.length());
>
>                             }
>
>                             }
>
>
>
>                             Now, where do I see the counter? Sorry for
>                             the naive question
>
>
>                             can anyone point me to any good end-to-end
>                             "hello world" example for flink metrics.
>
>
>
>
>
>
>
>


Re: need instruction on how the Flink metric works

Posted by Michael Fong <mc...@gmail.com>.
Hi, Jiewen,


Sorry, I am not familiar with Flink v1.2 nor Mac env. I did a brief tests
with Flink v1.2.1 (on Linux) with the examples you provided (counters
only), the metric shows up successfully and increments as typing more words
to console. Please check the following:
1. connect to the corresponding JMX server
2. Task is up and running

[image: Inline image 1]
Here is the code snippet FYI.

class MetricFunction<T> extends RichMapFunction<T, T> {

    private Counter counter;

       @Override
       public void open(Configuration config) {
           this.counter = getRuntimeContext()
                   .getMetricGroup()
                   .counter("my-counter");

       }

       @Override
       public T map(T item) throws Exception {
           this.counter.inc();
           return item;
       }
   }

DataStream<WordCount> result = text.map(new MetricFunction<>())

      .flatMap(

       ..... //test


Hope these would help,

Regards,

On Thu, Sep 21, 2017 at 3:35 AM, Jiewen Shao <ji...@gmail.com> wrote:

> thats weird, I am still having trouble to see my custom metrics "my-counter"
> and "my-meter", I was able to see the default system metrics.
> for example I have env.execute("Hello Flink"); when I connect to
> localhost:28888 (28888 is the port JMX listens to) I can see default
> flink system metrics (Hello_Flink), but just didn't see my custom metrics,
> I could miss something obvious. (btw, I used flink 1.2 on macbook, I
> started flink using start-cluster.sh), thanks!
>
> [image: Inline image 1]
>
> On Tue, Sep 19, 2017 at 7:07 PM, Michael Fong <mc...@gmail.com>
> wrote:
>
>> I just did the same test as you had with SocketWindowWordCount, and the
>> counter showed up all right.
>>
>> You should probably connect Jconsole to localhost:28781 (or whatever port
>> you have your JMX server listened on)
>>
>> That's how I setup the env, perhaps there is other better ways to do it.
>>
>> On Wed, Sep 20, 2017 at 9:15 AM, Jiewen Shao <ji...@gmail.com>
>> wrote:
>>
>>> Still got stuck, here are my steps (on my laptop)
>>>
>>> for example:
>>> Step1:
>>>
>>> public class MetricsTest<T> extends RichMapFunction<T, T> {
>>>
>>>
>>> private static final long serialVersionUID = 1L;
>>>
>>> private org.apache.flink.metrics.Meter meter;
>>>
>>>     private Counter counter;
>>>
>>>
>>>     @Override
>>>
>>>     public void open(Configuration config) {
>>>
>>>         this.counter = getRuntimeContext()
>>>
>>>                 .getMetricGroup()
>>>
>>>                 .counter("my-counter");
>>>
>>>
>>>
>>>         this.meter = getRuntimeContext()
>>>
>>>                 .getMetricGroup()
>>>
>>>                 .meter("my-meter", new DropwizardMeterWrapper(new
>>>  com.codahale.metrics.Meter()));
>>>
>>>     }
>>>
>>>
>>>     @Override
>>>
>>>     public T map(T item) throws Exception {
>>>
>>>         this.counter.inc();
>>>
>>>         this.meter.markEvent();
>>>
>>>         return item;
>>>
>>>     }
>>>
>>> }
>>>
>>>
>>>
>>>
>>> And I did followings in one of the Flink sample
>>> (SocketWindowWordCount.java):
>>> Step2:
>>>
>>> DataStream<String> text = env.socketTextStream("localhost", 12345, "\n"
>>> );
>>>
>>> text.map(new MetricsTest());  //<-- added this line
>>>
>>>
>>> Step3:
>>>
>>> mvn clean install
>>>
>>>
>>> step4: nc -l 12345
>>>
>>>
>>> step5:
>>>
>>> flink run -c [my_class_name] my.jar
>>>
>>>
>>> step6:  (type something under nc terminal)
>>>
>>> run jconsole, and pick the local process for this "flink run", and click
>>> the tab "MBeans" (I don't see my metrics other than system ones, is that
>>> the right place to look at?)
>>>
>>>
>>> and flink-conf.yaml has:
>>>
>>> # metrics
>>>
>>> metrics.reporters: jmx
>>>
>>> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>>>
>>> metrics.reporter.jmx.port: 28780-28790
>>>
>>>
>>> and taskmanager log looks ok regarding JMX
>>>
>>>
>>> did I miss steps or configurations? Thanks a lot!
>>>
>>>
>>>
>>>
>>> On Mon, Sep 18, 2017 at 12:30 AM, Michael Fong <mc...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> There are several possibilities:
>>>> 1. Please check if reporter is set up ( guide
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#jmx-orgapacheflinkmetricsjmxjmxreporter>
>>>>  )
>>>> For example, I would make sure my local JMXReporter service is up and
>>>> running by checking taskmanager.log and search for the line:
>>>>
>>>> 2017-09-18 15:18:57,174 INFO  org.apache.flink.metrics.jmx.JMXReporter
>>>>                     - Started JMX server on port 28781.
>>>> 2017-09-18 15:18:57,175 INFO  org.apache.flink.metrics.jmx.JMXReporter
>>>>                     - Configured JMXReporter with {port:28780-28790}
>>>>
>>>> If for any reason the JMX server does not start up, your might see some
>>>> errors:
>>>>
>>>> 2017-09-18 15:26:04,743 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>>>>              - Configuring JMXReporter with {port=28781, class=org.apac
>>>> he.flink.metrics.jmx.JMXReporter}.
>>>> 2017-09-18 15:26:04,760 ERROR org.apache.flink.runtime.metrics.MetricRegistry
>>>>              - Could not instantiate metrics reporter jmx. Metrics migh
>>>> t not be exposed/reported.
>>>> java.lang.RuntimeException: Could not start JMX server on any
>>>> configured port. Ports: 28781
>>>>         at org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.ja
>>>> va:126)
>>>>         at org.apache.flink.runtime.metrics.MetricRegistry.<init>(Metri
>>>> cRegistry.java:131)
>>>>         at org.apache.flink.runtime.taskexecutor.TaskManagerServices.fr
>>>> omConfiguration(TaskManagerServices.java:188)
>>>>         at org.apache.flink.runtime.taskmanager.TaskManager$.startTaskM
>>>> anagerComponentsAndActor(TaskManager.scala:1984)
>>>>         at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskMan
>>>> ager(TaskManager.scala:1823)
>>>>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>>>> apply$mcV$sp(TaskManager.scala:1926)
>>>>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>>>> apply(TaskManager.scala:1904)
>>>>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>>>> apply(TaskManager.scala:1904)
>>>>         at scala.util.Try$.apply(Try.scala:192)
>>>>
>>>>
>>>> Here is my local setup for conf/flink-conf.yaml for example:
>>>> metrics.reporters: jmx
>>>> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>>>> metrics.reporter.jmx.port: 28780-28790
>>>>
>>>> 2. You might want to try a real streaming example which could execute
>>>> continuously. If I remember correctly, when the task is completed, the
>>>> manager would seem to release the associated resource and object. In your
>>>> example, it is only processing a few strings, which would finish in matter
>>>> of milliseconds, before bringing up jconsole manually.
>>>>
>>>> Hope some of these help,
>>>>
>>>>
>>>>
>>>> On Mon, Sep 18, 2017 at 12:22 PM, Jiewen Shao <ji...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks, When I started jconsole, it listed com.apache.flink.runtime.
>>>>> jobmanager..:[port] as one of the Local Process, i was able to
>>>>> connect to it with insecure connection, but i was not able to locate the
>>>>> Counter metrics, I only saw some system metrics.
>>>>>
>>>>> On Sun, Sep 17, 2017 at 7:39 PM, Michael Fong <mc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> You may enable metrics reporter to see the output of your metrics;
>>>>>> counter in your example.
>>>>>>
>>>>>> There is a brief documentation regarding to metrics and reporter
>>>>>> setup at link
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html>.
>>>>>> The easiest approach, in my opinion, is to set up a JMX reporter so that
>>>>>> you may see your metrics via JConsole.
>>>>>>
>>>>>> Hope this helps.
>>>>>>
>>>>>> Regrads,
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 18, 2017 at 10:27 AM, Jiewen Shao <ji...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm new to flink and I have read https://ci.apache.org/pro
>>>>>>> jects/flink/flink-docs-release-1.3/monitoring/metrics.html, I am
>>>>>>> still unclear where do I read the metrics I added.
>>>>>>>
>>>>>>> for example,
>>>>>>>
>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>
>>>>>>>
>>>>>>>         StreamExecutionEnvironment env =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>
>>>>>>>         env.setParallelism(2);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>         List<String> wordList = Arrays.asList("Hive", "Presto",
>>>>>>> "Impala", "Parquet","ORC","Hadoop", "Flink", "Spark", "Storm", "Tez",
>>>>>>> "Flink");
>>>>>>>
>>>>>>>         DataStreamSource<String> source = env.fromCollection(
>>>>>>> wordList);
>>>>>>>
>>>>>>>         DataStream<Tuple2<String, Integer>> dataStream = env
>>>>>>> .fromCollection(wordList).map(new WordLengthCounter());
>>>>>>>
>>>>>>>         dataStream.print();
>>>>>>>
>>>>>>>         env.execute();
>>>>>>>
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>>
>>>>>>> public class WordLengthCounter extends RichMapFunction<String,
>>>>>>> Tuple2<String, Integer>> {
>>>>>>>
>>>>>>>
>>>>>>> private static final long serialVersionUID = 1L;
>>>>>>>
>>>>>>> private Counter counter;
>>>>>>>
>>>>>>>
>>>>>>> @Override
>>>>>>>
>>>>>>>   public void open(Configuration config) {
>>>>>>>
>>>>>>>     this.counter = getRuntimeContext()
>>>>>>>
>>>>>>>       .getMetricGroup()
>>>>>>>
>>>>>>>       .counter("myCounter");
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>>
>>>>>>> @Override
>>>>>>>
>>>>>>> public Tuple2<String, Integer> map(String value) throws Exception {
>>>>>>>
>>>>>>> this.counter.inc();
>>>>>>>
>>>>>>> return new Tuple2<String, Integer>(value, value.length());
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> Now, where do I see the counter? Sorry for the naive question
>>>>>>>
>>>>>>> can anyone point me to any good end-to-end "hello world" example for
>>>>>>> flink metrics.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: need instruction on how the Flink metric works

Posted by Jiewen Shao <ji...@gmail.com>.
thats weird, I am still having trouble to see my custom metrics "my-counter"
and "my-meter", I was able to see the default system metrics.
for example I have env.execute("Hello Flink"); when I connect to
localhost:28888 (28888 is the port JMX listens to) I can see default
flink system metrics (Hello_Flink), but just didn't see my custom metrics,
I could miss something obvious. (btw, I used flink 1.2 on macbook, I
started flink using start-cluster.sh), thanks!

[image: Inline image 1]

On Tue, Sep 19, 2017 at 7:07 PM, Michael Fong <mc...@gmail.com> wrote:

> I just did the same test as you had with SocketWindowWordCount, and the
> counter showed up all right.
>
> You should probably connect Jconsole to localhost:28781 (or whatever port
> you have your JMX server listened on)
>
> That's how I setup the env, perhaps there is other better ways to do it.
>
> On Wed, Sep 20, 2017 at 9:15 AM, Jiewen Shao <ji...@gmail.com> wrote:
>
>> Still got stuck, here are my steps (on my laptop)
>>
>> for example:
>> Step1:
>>
>> public class MetricsTest<T> extends RichMapFunction<T, T> {
>>
>>
>> private static final long serialVersionUID = 1L;
>>
>> private org.apache.flink.metrics.Meter meter;
>>
>>     private Counter counter;
>>
>>
>>     @Override
>>
>>     public void open(Configuration config) {
>>
>>         this.counter = getRuntimeContext()
>>
>>                 .getMetricGroup()
>>
>>                 .counter("my-counter");
>>
>>
>>
>>         this.meter = getRuntimeContext()
>>
>>                 .getMetricGroup()
>>
>>                 .meter("my-meter", new DropwizardMeterWrapper(new com.
>> codahale.metrics.Meter()));
>>
>>     }
>>
>>
>>     @Override
>>
>>     public T map(T item) throws Exception {
>>
>>         this.counter.inc();
>>
>>         this.meter.markEvent();
>>
>>         return item;
>>
>>     }
>>
>> }
>>
>>
>>
>>
>> And I did followings in one of the Flink sample
>> (SocketWindowWordCount.java):
>> Step2:
>>
>> DataStream<String> text = env.socketTextStream("localhost", 12345, "\n");
>>
>> text.map(new MetricsTest());  //<-- added this line
>>
>>
>> Step3:
>>
>> mvn clean install
>>
>>
>> step4: nc -l 12345
>>
>>
>> step5:
>>
>> flink run -c [my_class_name] my.jar
>>
>>
>> step6:  (type something under nc terminal)
>>
>> run jconsole, and pick the local process for this "flink run", and click
>> the tab "MBeans" (I don't see my metrics other than system ones, is that
>> the right place to look at?)
>>
>>
>> and flink-conf.yaml has:
>>
>> # metrics
>>
>> metrics.reporters: jmx
>>
>> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>>
>> metrics.reporter.jmx.port: 28780-28790
>>
>>
>> and taskmanager log looks ok regarding JMX
>>
>>
>> did I miss steps or configurations? Thanks a lot!
>>
>>
>>
>>
>> On Mon, Sep 18, 2017 at 12:30 AM, Michael Fong <mc...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> There are several possibilities:
>>> 1. Please check if reporter is set up ( guide
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#jmx-orgapacheflinkmetricsjmxjmxreporter>
>>>  )
>>> For example, I would make sure my local JMXReporter service is up and
>>> running by checking taskmanager.log and search for the line:
>>>
>>> 2017-09-18 15:18:57,174 INFO  org.apache.flink.metrics.jmx.JMXReporter
>>>                     - Started JMX server on port 28781.
>>> 2017-09-18 15:18:57,175 INFO  org.apache.flink.metrics.jmx.JMXReporter
>>>                     - Configured JMXReporter with {port:28780-28790}
>>>
>>> If for any reason the JMX server does not start up, your might see some
>>> errors:
>>>
>>> 2017-09-18 15:26:04,743 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>>>              - Configuring JMXReporter with {port=28781, class=org.apac
>>> he.flink.metrics.jmx.JMXReporter}.
>>> 2017-09-18 15:26:04,760 ERROR org.apache.flink.runtime.metrics.MetricRegistry
>>>              - Could not instantiate metrics reporter jmx. Metrics migh
>>> t not be exposed/reported.
>>> java.lang.RuntimeException: Could not start JMX server on any configured
>>> port. Ports: 28781
>>>         at org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.ja
>>> va:126)
>>>         at org.apache.flink.runtime.metrics.MetricRegistry.<init>(Metri
>>> cRegistry.java:131)
>>>         at org.apache.flink.runtime.taskexecutor.TaskManagerServices.fr
>>> omConfiguration(TaskManagerServices.java:188)
>>>         at org.apache.flink.runtime.taskmanager.TaskManager$.startTaskM
>>> anagerComponentsAndActor(TaskManager.scala:1984)
>>>         at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskMan
>>> ager(TaskManager.scala:1823)
>>>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>>> apply$mcV$sp(TaskManager.scala:1926)
>>>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>>> apply(TaskManager.scala:1904)
>>>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>>> apply(TaskManager.scala:1904)
>>>         at scala.util.Try$.apply(Try.scala:192)
>>>
>>>
>>> Here is my local setup for conf/flink-conf.yaml for example:
>>> metrics.reporters: jmx
>>> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>>> metrics.reporter.jmx.port: 28780-28790
>>>
>>> 2. You might want to try a real streaming example which could execute
>>> continuously. If I remember correctly, when the task is completed, the
>>> manager would seem to release the associated resource and object. In your
>>> example, it is only processing a few strings, which would finish in matter
>>> of milliseconds, before bringing up jconsole manually.
>>>
>>> Hope some of these help,
>>>
>>>
>>>
>>> On Mon, Sep 18, 2017 at 12:22 PM, Jiewen Shao <ji...@gmail.com>
>>> wrote:
>>>
>>>> Thanks, When I started jconsole, it listed com.apache.flink.runtime.
>>>> jobmanager..:[port] as one of the Local Process, i was able to connect
>>>> to it with insecure connection, but i was not able to locate the Counter
>>>> metrics, I only saw some system metrics.
>>>>
>>>> On Sun, Sep 17, 2017 at 7:39 PM, Michael Fong <mc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> You may enable metrics reporter to see the output of your metrics;
>>>>> counter in your example.
>>>>>
>>>>> There is a brief documentation regarding to metrics and reporter setup
>>>>> at link
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html>.
>>>>> The easiest approach, in my opinion, is to set up a JMX reporter so that
>>>>> you may see your metrics via JConsole.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> Regrads,
>>>>>
>>>>>
>>>>> On Mon, Sep 18, 2017 at 10:27 AM, Jiewen Shao <ji...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'm new to flink and I have read https://ci.apache.org/pro
>>>>>> jects/flink/flink-docs-release-1.3/monitoring/metrics.html, I am
>>>>>> still unclear where do I read the metrics I added.
>>>>>>
>>>>>> for example,
>>>>>>
>>>>>> public static void main(String[] args) throws Exception {
>>>>>>
>>>>>>
>>>>>>         StreamExecutionEnvironment env =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>
>>>>>>         env.setParallelism(2);
>>>>>>
>>>>>>
>>>>>>
>>>>>>         List<String> wordList = Arrays.asList("Hive", "Presto",
>>>>>> "Impala", "Parquet","ORC","Hadoop", "Flink", "Spark", "Storm", "Tez",
>>>>>> "Flink");
>>>>>>
>>>>>>         DataStreamSource<String> source = env.fromCollection(wordList
>>>>>> );
>>>>>>
>>>>>>         DataStream<Tuple2<String, Integer>> dataStream = env
>>>>>> .fromCollection(wordList).map(new WordLengthCounter());
>>>>>>
>>>>>>         dataStream.print();
>>>>>>
>>>>>>         env.execute();
>>>>>>
>>>>>>     }
>>>>>>
>>>>>>
>>>>>> and
>>>>>>
>>>>>>
>>>>>> public class WordLengthCounter extends RichMapFunction<String,
>>>>>> Tuple2<String, Integer>> {
>>>>>>
>>>>>>
>>>>>> private static final long serialVersionUID = 1L;
>>>>>>
>>>>>> private Counter counter;
>>>>>>
>>>>>>
>>>>>> @Override
>>>>>>
>>>>>>   public void open(Configuration config) {
>>>>>>
>>>>>>     this.counter = getRuntimeContext()
>>>>>>
>>>>>>       .getMetricGroup()
>>>>>>
>>>>>>       .counter("myCounter");
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>
>>>>>> @Override
>>>>>>
>>>>>> public Tuple2<String, Integer> map(String value) throws Exception {
>>>>>>
>>>>>> this.counter.inc();
>>>>>>
>>>>>> return new Tuple2<String, Integer>(value, value.length());
>>>>>>
>>>>>> }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Now, where do I see the counter? Sorry for the naive question
>>>>>>
>>>>>> can anyone point me to any good end-to-end "hello world" example for
>>>>>> flink metrics.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: need instruction on how the Flink metric works

Posted by Michael Fong <mc...@gmail.com>.
I just did the same test as you had with SocketWindowWordCount, and the
counter showed up all right.

You should probably connect Jconsole to localhost:28781 (or whatever port
you have your JMX server listened on)

That's how I setup the env, perhaps there is other better ways to do it.

On Wed, Sep 20, 2017 at 9:15 AM, Jiewen Shao <ji...@gmail.com> wrote:

> Still got stuck, here are my steps (on my laptop)
>
> for example:
> Step1:
>
> public class MetricsTest<T> extends RichMapFunction<T, T> {
>
>
> private static final long serialVersionUID = 1L;
>
> private org.apache.flink.metrics.Meter meter;
>
>     private Counter counter;
>
>
>     @Override
>
>     public void open(Configuration config) {
>
>         this.counter = getRuntimeContext()
>
>                 .getMetricGroup()
>
>                 .counter("my-counter");
>
>
>
>         this.meter = getRuntimeContext()
>
>                 .getMetricGroup()
>
>                 .meter("my-meter", new DropwizardMeterWrapper(new
> com.codahale.metrics.Meter()));
>
>     }
>
>
>     @Override
>
>     public T map(T item) throws Exception {
>
>         this.counter.inc();
>
>         this.meter.markEvent();
>
>         return item;
>
>     }
>
> }
>
>
>
>
> And I did followings in one of the Flink sample
> (SocketWindowWordCount.java):
> Step2:
>
> DataStream<String> text = env.socketTextStream("localhost", 12345, "\n");
>
> text.map(new MetricsTest());  //<-- added this line
>
>
> Step3:
>
> mvn clean install
>
>
> step4: nc -l 12345
>
>
> step5:
>
> flink run -c [my_class_name] my.jar
>
>
> step6:  (type something under nc terminal)
>
> run jconsole, and pick the local process for this "flink run", and click
> the tab "MBeans" (I don't see my metrics other than system ones, is that
> the right place to look at?)
>
>
> and flink-conf.yaml has:
>
> # metrics
>
> metrics.reporters: jmx
>
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>
> metrics.reporter.jmx.port: 28780-28790
>
>
> and taskmanager log looks ok regarding JMX
>
>
> did I miss steps or configurations? Thanks a lot!
>
>
>
>
> On Mon, Sep 18, 2017 at 12:30 AM, Michael Fong <mc...@gmail.com>
> wrote:
>
>> Hi,
>>
>> There are several possibilities:
>> 1. Please check if reporter is set up ( guide
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#jmx-orgapacheflinkmetricsjmxjmxreporter>
>>  )
>> For example, I would make sure my local JMXReporter service is up and
>> running by checking taskmanager.log and search for the line:
>>
>> 2017-09-18 15:18:57,174 INFO  org.apache.flink.metrics.jmx.JMXReporter
>>                     - Started JMX server on port 28781.
>> 2017-09-18 15:18:57,175 INFO  org.apache.flink.metrics.jmx.JMXReporter
>>                     - Configured JMXReporter with {port:28780-28790}
>>
>> If for any reason the JMX server does not start up, your might see some
>> errors:
>>
>> 2017-09-18 15:26:04,743 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>>              - Configuring JMXReporter with {port=28781, class=org.apac
>> he.flink.metrics.jmx.JMXReporter}.
>> 2017-09-18 15:26:04,760 ERROR org.apache.flink.runtime.metrics.MetricRegistry
>>              - Could not instantiate metrics reporter jmx. Metrics migh
>> t not be exposed/reported.
>> java.lang.RuntimeException: Could not start JMX server on any configured
>> port. Ports: 28781
>>         at org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.ja
>> va:126)
>>         at org.apache.flink.runtime.metrics.MetricRegistry.<init>(Metri
>> cRegistry.java:131)
>>         at org.apache.flink.runtime.taskexecutor.TaskManagerServices.fr
>> omConfiguration(TaskManagerServices.java:188)
>>         at org.apache.flink.runtime.taskmanager.TaskManager$.startTaskM
>> anagerComponentsAndActor(TaskManager.scala:1984)
>>         at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskMan
>> ager(TaskManager.scala:1823)
>>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>> apply$mcV$sp(TaskManager.scala:1926)
>>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>> apply(TaskManager.scala:1904)
>>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>> apply(TaskManager.scala:1904)
>>         at scala.util.Try$.apply(Try.scala:192)
>>
>>
>> Here is my local setup for conf/flink-conf.yaml for example:
>> metrics.reporters: jmx
>> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>> metrics.reporter.jmx.port: 28780-28790
>>
>> 2. You might want to try a real streaming example which could execute
>> continuously. If I remember correctly, when the task is completed, the
>> manager would seem to release the associated resource and object. In your
>> example, it is only processing a few strings, which would finish in matter
>> of milliseconds, before bringing up jconsole manually.
>>
>> Hope some of these help,
>>
>>
>>
>> On Mon, Sep 18, 2017 at 12:22 PM, Jiewen Shao <ji...@gmail.com>
>> wrote:
>>
>>> Thanks, When I started jconsole, it listed com.apache.flink.runtime.
>>> jobmanager..:[port] as one of the Local Process, i was able to connect
>>> to it with insecure connection, but i was not able to locate the Counter
>>> metrics, I only saw some system metrics.
>>>
>>> On Sun, Sep 17, 2017 at 7:39 PM, Michael Fong <mc...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> You may enable metrics reporter to see the output of your metrics;
>>>> counter in your example.
>>>>
>>>> There is a brief documentation regarding to metrics and reporter setup
>>>> at link
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html>.
>>>> The easiest approach, in my opinion, is to set up a JMX reporter so that
>>>> you may see your metrics via JConsole.
>>>>
>>>> Hope this helps.
>>>>
>>>> Regrads,
>>>>
>>>>
>>>> On Mon, Sep 18, 2017 at 10:27 AM, Jiewen Shao <ji...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm new to flink and I have read https://ci.apache.org/pro
>>>>> jects/flink/flink-docs-release-1.3/monitoring/metrics.html, I am
>>>>> still unclear where do I read the metrics I added.
>>>>>
>>>>> for example,
>>>>>
>>>>> public static void main(String[] args) throws Exception {
>>>>>
>>>>>
>>>>>         StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>
>>>>>         env.setParallelism(2);
>>>>>
>>>>>
>>>>>
>>>>>         List<String> wordList = Arrays.asList("Hive", "Presto",
>>>>> "Impala", "Parquet","ORC","Hadoop", "Flink", "Spark", "Storm", "Tez",
>>>>> "Flink");
>>>>>
>>>>>         DataStreamSource<String> source = env.fromCollection(wordList
>>>>> );
>>>>>
>>>>>         DataStream<Tuple2<String, Integer>> dataStream = env
>>>>> .fromCollection(wordList).map(new WordLengthCounter());
>>>>>
>>>>>         dataStream.print();
>>>>>
>>>>>         env.execute();
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>> and
>>>>>
>>>>>
>>>>> public class WordLengthCounter extends RichMapFunction<String,
>>>>> Tuple2<String, Integer>> {
>>>>>
>>>>>
>>>>> private static final long serialVersionUID = 1L;
>>>>>
>>>>> private Counter counter;
>>>>>
>>>>>
>>>>> @Override
>>>>>
>>>>>   public void open(Configuration config) {
>>>>>
>>>>>     this.counter = getRuntimeContext()
>>>>>
>>>>>       .getMetricGroup()
>>>>>
>>>>>       .counter("myCounter");
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>> @Override
>>>>>
>>>>> public Tuple2<String, Integer> map(String value) throws Exception {
>>>>>
>>>>> this.counter.inc();
>>>>>
>>>>> return new Tuple2<String, Integer>(value, value.length());
>>>>>
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> Now, where do I see the counter? Sorry for the naive question
>>>>>
>>>>> can anyone point me to any good end-to-end "hello world" example for
>>>>> flink metrics.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: need instruction on how the Flink metric works

Posted by Jiewen Shao <ji...@gmail.com>.
Still got stuck, here are my steps (on my laptop)

for example:
Step1:

public class MetricsTest<T> extends RichMapFunction<T, T> {


private static final long serialVersionUID = 1L;

private org.apache.flink.metrics.Meter meter;

    private Counter counter;


    @Override

    public void open(Configuration config) {

        this.counter = getRuntimeContext()

                .getMetricGroup()

                .counter("my-counter");



        this.meter = getRuntimeContext()

                .getMetricGroup()

                .meter("my-meter", new DropwizardMeterWrapper(new
 com.codahale.metrics.Meter()));

    }


    @Override

    public T map(T item) throws Exception {

        this.counter.inc();

        this.meter.markEvent();

        return item;

    }

}




And I did followings in one of the Flink sample
(SocketWindowWordCount.java):
Step2:

DataStream<String> text = env.socketTextStream("localhost", 12345, "\n");

text.map(new MetricsTest());  //<-- added this line


Step3:

mvn clean install


step4: nc -l 12345


step5:

flink run -c [my_class_name] my.jar


step6:  (type something under nc terminal)

run jconsole, and pick the local process for this "flink run", and click
the tab "MBeans" (I don't see my metrics other than system ones, is that
the right place to look at?)


and flink-conf.yaml has:

# metrics

metrics.reporters: jmx

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter

metrics.reporter.jmx.port: 28780-28790


and taskmanager log looks ok regarding JMX


did I miss steps or configurations? Thanks a lot!




On Mon, Sep 18, 2017 at 12:30 AM, Michael Fong <mc...@gmail.com>
wrote:

> Hi,
>
> There are several possibilities:
> 1. Please check if reporter is set up ( guide
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#jmx-orgapacheflinkmetricsjmxjmxreporter>
>  )
> For example, I would make sure my local JMXReporter service is up and
> running by checking taskmanager.log and search for the line:
>
> 2017-09-18 15:18:57,174 INFO  org.apache.flink.metrics.jmx.JMXReporter
>                   - Started JMX server on port 28781.
> 2017-09-18 15:18:57,175 INFO  org.apache.flink.metrics.jmx.JMXReporter
>                   - Configured JMXReporter with {port:28780-28790}
>
> If for any reason the JMX server does not start up, your might see some
> errors:
>
> 2017-09-18 15:26:04,743 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>              - Configuring JMXReporter with {port=28781, class=org.apac
> he.flink.metrics.jmx.JMXReporter}.
> 2017-09-18 15:26:04,760 ERROR org.apache.flink.runtime.metrics.MetricRegistry
>              - Could not instantiate metrics reporter jmx. Metrics migh
> t not be exposed/reported.
> java.lang.RuntimeException: Could not start JMX server on any configured
> port. Ports: 28781
>         at org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.
> java:126)
>         at org.apache.flink.runtime.metrics.MetricRegistry.<init>(
> MetricRegistry.java:131)
>         at org.apache.flink.runtime.taskexecutor.TaskManagerServices.
> fromConfiguration(TaskManagerServices.java:188)
>         at org.apache.flink.runtime.taskmanager.TaskManager$.
> startTaskManagerComponentsAndActor(TaskManager.scala:1984)
>         at org.apache.flink.runtime.taskmanager.TaskManager$.
> runTaskManager(TaskManager.scala:1823)
>         at org.apache.flink.runtime.taskmanager.TaskManager$$
> anonfun$1.apply$mcV$sp(TaskManager.scala:1926)
>         at org.apache.flink.runtime.taskmanager.TaskManager$$
> anonfun$1.apply(TaskManager.scala:1904)
>         at org.apache.flink.runtime.taskmanager.TaskManager$$
> anonfun$1.apply(TaskManager.scala:1904)
>         at scala.util.Try$.apply(Try.scala:192)
>
>
> Here is my local setup for conf/flink-conf.yaml for example:
> metrics.reporters: jmx
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporter.jmx.port: 28780-28790
>
> 2. You might want to try a real streaming example which could execute
> continuously. If I remember correctly, when the task is completed, the
> manager would seem to release the associated resource and object. In your
> example, it is only processing a few strings, which would finish in matter
> of milliseconds, before bringing up jconsole manually.
>
> Hope some of these help,
>
>
>
> On Mon, Sep 18, 2017 at 12:22 PM, Jiewen Shao <ji...@gmail.com>
> wrote:
>
>> Thanks, When I started jconsole, it listed com.apache.flink.runtime.jobmanager..:[port]
>> as one of the Local Process, i was able to connect to it with insecure
>> connection, but i was not able to locate the Counter metrics, I only saw
>> some system metrics.
>>
>> On Sun, Sep 17, 2017 at 7:39 PM, Michael Fong <mc...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> You may enable metrics reporter to see the output of your metrics;
>>> counter in your example.
>>>
>>> There is a brief documentation regarding to metrics and reporter setup
>>> at link
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html>.
>>> The easiest approach, in my opinion, is to set up a JMX reporter so that
>>> you may see your metrics via JConsole.
>>>
>>> Hope this helps.
>>>
>>> Regrads,
>>>
>>>
>>> On Mon, Sep 18, 2017 at 10:27 AM, Jiewen Shao <ji...@gmail.com>
>>> wrote:
>>>
>>>> I'm new to flink and I have read https://ci.apache.org/pro
>>>> jects/flink/flink-docs-release-1.3/monitoring/metrics.html, I am still
>>>> unclear where do I read the metrics I added.
>>>>
>>>> for example,
>>>>
>>>> public static void main(String[] args) throws Exception {
>>>>
>>>>
>>>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>>> ExecutionEnvironment();
>>>>
>>>>         env.setParallelism(2);
>>>>
>>>>
>>>>
>>>>         List<String> wordList = Arrays.asList("Hive", "Presto",
>>>> "Impala", "Parquet","ORC","Hadoop", "Flink", "Spark", "Storm", "Tez",
>>>> "Flink");
>>>>
>>>>         DataStreamSource<String> source = env.fromCollection(wordList);
>>>>
>>>>         DataStream<Tuple2<String, Integer>> dataStream = env
>>>> .fromCollection(wordList).map(new WordLengthCounter());
>>>>
>>>>         dataStream.print();
>>>>
>>>>         env.execute();
>>>>
>>>>     }
>>>>
>>>>
>>>> and
>>>>
>>>>
>>>> public class WordLengthCounter extends RichMapFunction<String,
>>>> Tuple2<String, Integer>> {
>>>>
>>>>
>>>> private static final long serialVersionUID = 1L;
>>>>
>>>> private Counter counter;
>>>>
>>>>
>>>> @Override
>>>>
>>>>   public void open(Configuration config) {
>>>>
>>>>     this.counter = getRuntimeContext()
>>>>
>>>>       .getMetricGroup()
>>>>
>>>>       .counter("myCounter");
>>>>
>>>>   }
>>>>
>>>>
>>>> @Override
>>>>
>>>> public Tuple2<String, Integer> map(String value) throws Exception {
>>>>
>>>> this.counter.inc();
>>>>
>>>> return new Tuple2<String, Integer>(value, value.length());
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>> Now, where do I see the counter? Sorry for the naive question
>>>>
>>>> can anyone point me to any good end-to-end "hello world" example for
>>>> flink metrics.
>>>>
>>>
>>>
>>
>

Re: need instruction on how the Flink metric works

Posted by Michael Fong <mc...@gmail.com>.
Hi,

There are several possibilities:
1. Please check if reporter is set up ( guide
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#jmx-orgapacheflinkmetricsjmxjmxreporter>
 )
For example, I would make sure my local JMXReporter service is up and
running by checking taskmanager.log and search for the line:

2017-09-18 15:18:57,174 INFO  org.apache.flink.metrics.jmx.JMXReporter
                - Started JMX server on port 28781.
2017-09-18 15:18:57,175 INFO  org.apache.flink.metrics.jmx.JMXReporter
                - Configured JMXReporter with {port:28780-28790}

If for any reason the JMX server does not start up, your might see some
errors:

2017-09-18 15:26:04,743 INFO
org.apache.flink.runtime.metrics.MetricRegistry               - Configuring
JMXReporter with {port=28781, class=org.apac
he.flink.metrics.jmx.JMXReporter}.
2017-09-18 15:26:04,760 ERROR
org.apache.flink.runtime.metrics.MetricRegistry               - Could not
instantiate metrics reporter jmx. Metrics migh
t not be exposed/reported.
java.lang.RuntimeException: Could not start JMX server on any configured
port. Ports: 28781
        at
org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.java:126)
        at
org.apache.flink.runtime.metrics.MetricRegistry.<init>(MetricRegistry.java:131)
        at
org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:188)
        at
org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1984)
        at
org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1823)
        at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply$mcV$sp(TaskManager.scala:1926)
        at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1904)
        at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1904)
        at scala.util.Try$.apply(Try.scala:192)


Here is my local setup for conf/flink-conf.yaml for example:
metrics.reporters: jmx
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 28780-28790

2. You might want to try a real streaming example which could execute
continuously. If I remember correctly, when the task is completed, the
manager would seem to release the associated resource and object. In your
example, it is only processing a few strings, which would finish in matter
of milliseconds, before bringing up jconsole manually.

Hope some of these help,



On Mon, Sep 18, 2017 at 12:22 PM, Jiewen Shao <ji...@gmail.com> wrote:

> Thanks, When I started jconsole, it listed com.apache.flink.runtime.jobmanager..:[port]
> as one of the Local Process, i was able to connect to it with insecure
> connection, but i was not able to locate the Counter metrics, I only saw
> some system metrics.
>
> On Sun, Sep 17, 2017 at 7:39 PM, Michael Fong <mc...@gmail.com>
> wrote:
>
>> Hi,
>>
>> You may enable metrics reporter to see the output of your metrics;
>> counter in your example.
>>
>> There is a brief documentation regarding to metrics and reporter setup at
>> link
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html>.
>> The easiest approach, in my opinion, is to set up a JMX reporter so that
>> you may see your metrics via JConsole.
>>
>> Hope this helps.
>>
>> Regrads,
>>
>>
>> On Mon, Sep 18, 2017 at 10:27 AM, Jiewen Shao <ji...@gmail.com>
>> wrote:
>>
>>> I'm new to flink and I have read https://ci.apache.org/pro
>>> jects/flink/flink-docs-release-1.3/monitoring/metrics.html, I am still
>>> unclear where do I read the metrics I added.
>>>
>>> for example,
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>>
>>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>> ExecutionEnvironment();
>>>
>>>         env.setParallelism(2);
>>>
>>>
>>>
>>>         List<String> wordList = Arrays.asList("Hive", "Presto", "Impala",
>>> "Parquet","ORC","Hadoop", "Flink", "Spark", "Storm", "Tez", "Flink");
>>>
>>>
>>>         DataStreamSource<String> source = env.fromCollection(wordList);
>>>
>>>         DataStream<Tuple2<String, Integer>> dataStream = env
>>> .fromCollection(wordList).map(new WordLengthCounter());
>>>
>>>         dataStream.print();
>>>
>>>         env.execute();
>>>
>>>     }
>>>
>>>
>>> and
>>>
>>>
>>> public class WordLengthCounter extends RichMapFunction<String,
>>> Tuple2<String, Integer>> {
>>>
>>>
>>> private static final long serialVersionUID = 1L;
>>>
>>> private Counter counter;
>>>
>>>
>>> @Override
>>>
>>>   public void open(Configuration config) {
>>>
>>>     this.counter = getRuntimeContext()
>>>
>>>       .getMetricGroup()
>>>
>>>       .counter("myCounter");
>>>
>>>   }
>>>
>>>
>>> @Override
>>>
>>> public Tuple2<String, Integer> map(String value) throws Exception {
>>>
>>> this.counter.inc();
>>>
>>> return new Tuple2<String, Integer>(value, value.length());
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> Now, where do I see the counter? Sorry for the naive question
>>>
>>> can anyone point me to any good end-to-end "hello world" example for
>>> flink metrics.
>>>
>>
>>
>

Re: need instruction on how the Flink metric works

Posted by Jiewen Shao <ji...@gmail.com>.
Thanks, When I started jconsole, it listed
com.apache.flink.runtime.jobmanager..:[port]
as one of the Local Process, i was able to connect to it with insecure
connection, but i was not able to locate the Counter metrics, I only saw
some system metrics.

On Sun, Sep 17, 2017 at 7:39 PM, Michael Fong <mc...@gmail.com> wrote:

> Hi,
>
> You may enable metrics reporter to see the output of your metrics; counter
> in your example.
>
> There is a brief documentation regarding to metrics and reporter setup at
> link
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html>.
> The easiest approach, in my opinion, is to set up a JMX reporter so that
> you may see your metrics via JConsole.
>
> Hope this helps.
>
> Regrads,
>
>
> On Mon, Sep 18, 2017 at 10:27 AM, Jiewen Shao <ji...@gmail.com>
> wrote:
>
>> I'm new to flink and I have read https://ci.apache.org/pro
>> jects/flink/flink-docs-release-1.3/monitoring/metrics.html, I am still
>> unclear where do I read the metrics I added.
>>
>> for example,
>>
>> public static void main(String[] args) throws Exception {
>>
>>
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>>
>>         env.setParallelism(2);
>>
>>
>>
>>         List<String> wordList = Arrays.asList("Hive", "Presto", "Impala",
>> "Parquet","ORC","Hadoop", "Flink", "Spark", "Storm", "Tez", "Flink");
>>
>>
>>         DataStreamSource<String> source = env.fromCollection(wordList);
>>
>>         DataStream<Tuple2<String, Integer>> dataStream = env
>> .fromCollection(wordList).map(new WordLengthCounter());
>>
>>         dataStream.print();
>>
>>         env.execute();
>>
>>     }
>>
>>
>> and
>>
>>
>> public class WordLengthCounter extends RichMapFunction<String,
>> Tuple2<String, Integer>> {
>>
>>
>> private static final long serialVersionUID = 1L;
>>
>> private Counter counter;
>>
>>
>> @Override
>>
>>   public void open(Configuration config) {
>>
>>     this.counter = getRuntimeContext()
>>
>>       .getMetricGroup()
>>
>>       .counter("myCounter");
>>
>>   }
>>
>>
>> @Override
>>
>> public Tuple2<String, Integer> map(String value) throws Exception {
>>
>> this.counter.inc();
>>
>> return new Tuple2<String, Integer>(value, value.length());
>>
>> }
>>
>> }
>>
>>
>> Now, where do I see the counter? Sorry for the naive question
>>
>> can anyone point me to any good end-to-end "hello world" example for
>> flink metrics.
>>
>
>

Re: need instruction on how the Flink metric works

Posted by Michael Fong <mc...@gmail.com>.
Hi,

You may enable metrics reporter to see the output of your metrics; counter
in your example.

There is a brief documentation regarding to metrics and reporter setup at
link
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html>.
The easiest approach, in my opinion, is to set up a JMX reporter so that
you may see your metrics via JConsole.

Hope this helps.

Regrads,


On Mon, Sep 18, 2017 at 10:27 AM, Jiewen Shao <ji...@gmail.com> wrote:

> I'm new to flink and I have read https://ci.apache.org/
> projects/flink/flink-docs-release-1.3/monitoring/metrics.html, I am still
> unclear where do I read the metrics I added.
>
> for example,
>
> public static void main(String[] args) throws Exception {
>
>
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
>         env.setParallelism(2);
>
>
>
>         List<String> wordList = Arrays.asList("Hive", "Presto", "Impala",
> "Parquet","ORC","Hadoop", "Flink", "Spark", "Storm", "Tez", "Flink");
>
>
>         DataStreamSource<String> source = env.fromCollection(wordList);
>
>         DataStream<Tuple2<String, Integer>> dataStream = env
> .fromCollection(wordList).map(new WordLengthCounter());
>
>         dataStream.print();
>
>         env.execute();
>
>     }
>
>
> and
>
>
> public class WordLengthCounter extends RichMapFunction<String,
> Tuple2<String, Integer>> {
>
>
> private static final long serialVersionUID = 1L;
>
> private Counter counter;
>
>
> @Override
>
>   public void open(Configuration config) {
>
>     this.counter = getRuntimeContext()
>
>       .getMetricGroup()
>
>       .counter("myCounter");
>
>   }
>
>
> @Override
>
> public Tuple2<String, Integer> map(String value) throws Exception {
>
> this.counter.inc();
>
> return new Tuple2<String, Integer>(value, value.length());
>
> }
>
> }
>
>
> Now, where do I see the counter? Sorry for the naive question
>
> can anyone point me to any good end-to-end "hello world" example for flink
> metrics.
>