You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Truebody, Kyle" <Tr...@DNB.com> on 2020/07/02 20:07:16 UTC

Querying Metrics when using Spark Runner

Hi,

We have recently upgraded to the latest version of Apache Beam 2.22.0. We were previously using version 2.13.0 .

We are using the SparkRunner.

I noticed that after the upgrade that the Metrics query has stop producing values. Through debugging I can see that the metrics and distribution  are still being incremented as expected. Has the metrics interface changed at all? This is currently how we querying the metrics.


    PipelineResult result = p.run();
    for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) {
               LOGGER.info("n=" + r.getName() + " v=" + r.getAttempted().toString());
    }
    for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions()) {
               LOGGER.info("n=" + r.getName() + " v=" + r.getAttempted().toString());
    }
    result.waitUntilFinish();


Thanks,
Kyle

Re: Querying Metrics when using Spark Runner

Posted by Alexey Romanenko <ar...@gmail.com>.
Well, I think for streaming jobs (and even for long running batch jobs) you need to query metrics from time to time from PipelineResult in separate thread or use MetricsPusher [1, that does this job and pushes metrics into supported sink.

[1] https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsPusher.java <https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsPusher.java>

> On 7 Jul 2020, at 12:14, Truebody, Kyle <Tr...@DNB.com> wrote:
> 
> Thanks for the response Alexey,
>  
> I have it working now. Looks like the order of how I implemented it is causing early call to Metrics Container. This is working for me now :
> 
> PipelineResult result = p.run();
> result.waitUntilFinish();
>  
> for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) {
>                      LOGGER.info <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" + r.getName() + " v=" + r.getAttempted().toString());
> }
> for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions()) {
>      LOGGER.info <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" + r.getName() + " v=" + r.getAttempted().toString());
>              }
>  
> Just to call out, I guess this would only work as batch mode running ( which is currently what our target mode is for now ).
> The implementation in the previous mail did not produce counters in streaming mode.
> 
> 
>  
> From: Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> 
> Sent: Friday, July 3, 2020 5:17 PM
> To: user@beam.apache.org <ma...@beam.apache.org>
> Subject: Re: Querying Metrics when using Spark Runner
>  
> CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.
>  
> Hmmm, the only thing that comes to my mind is that we started to use Spark AccumulatorsV2 API [1] but it was released just right in Beam 2.13.0
> There is a related issue with this upgrade [2] but seems it’s not related.
>  
> [1] https://issues.apache.org/jira/browse/BEAM-4552 <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-4552&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066266890&sdata=T6JGD8Y9OBfc063UBt4Izj7PsXUyuSpep80M1NWV0cs%3D&reserved=0>
> [2] https://issues.apache.org/jira/browse/BEAM-10294 <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10294&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066266890&sdata=ekCLW6PmvlxeOhc4uFMLgAzWqQv4bfNfHmCaRhaHiQQ%3D&reserved=0>
> 
> 
> On 2 Jul 2020, at 22:07, Truebody, Kyle <TruebodyK@DNB.com <ma...@DNB.com>> wrote:
>  
> Hi, 
>  
> We have recently upgraded to the latest version of Apache Beam 2.22.0. We were previously using version 2.13.0 .
>  
> We are using the SparkRunner.
> 
> 
> I noticed that after the upgrade that the Metrics query has stop producing values. Through debugging I can see that the metrics and distribution  are still being incremented as expected. Has the metrics interface changed at all? This is currently how we querying the metrics.
> 
> 
>     PipelineResult result = p.run();
>     for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) {
>                LOGGER.info <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" + r.getName() + " v=" + r.getAttempted().toString());
>     }
>     for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions()) {
>                LOGGER.info <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" + r.getName() + " v=" + r.getAttempted().toString());
>     }
>     result.waitUntilFinish();
> 
> 
>  
> Thanks,
> Kyle


RE: Querying Metrics when using Spark Runner

Posted by "Truebody, Kyle" <Tr...@DNB.com>.
Thanks for the response Alexey,

I have it working now. Looks like the order of how I implemented it is causing early call to Metrics Container. This is working for me now :

PipelineResult result = p.run();
result.waitUntilFinish();

for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) {
                     LOGGER.info<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" + r.getName() + " v=" + r.getAttempted().toString());
}
for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions()) {
     LOGGER.info<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" + r.getName() + " v=" + r.getAttempted().toString());
             }

Just to call out, I guess this would only work as batch mode running ( which is currently what our target mode is for now ).
The implementation in the previous mail did not produce counters in streaming mode.



From: Alexey Romanenko <ar...@gmail.com>
Sent: Friday, July 3, 2020 5:17 PM
To: user@beam.apache.org
Subject: Re: Querying Metrics when using Spark Runner

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Hmmm, the only thing that comes to my mind is that we started to use Spark AccumulatorsV2 API [1] but it was released just right in Beam 2.13.0
There is a related issue with this upgrade [2] but seems it’s not related.

[1] https://issues.apache.org/jira/browse/BEAM-4552<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-4552&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066266890&sdata=T6JGD8Y9OBfc063UBt4Izj7PsXUyuSpep80M1NWV0cs%3D&reserved=0>
[2] https://issues.apache.org/jira/browse/BEAM-10294<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10294&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066266890&sdata=ekCLW6PmvlxeOhc4uFMLgAzWqQv4bfNfHmCaRhaHiQQ%3D&reserved=0>


On 2 Jul 2020, at 22:07, Truebody, Kyle <Tr...@DNB.com>> wrote:

Hi,

We have recently upgraded to the latest version of Apache Beam 2.22.0. We were previously using version 2.13.0 .

We are using the SparkRunner.


I noticed that after the upgrade that the Metrics query has stop producing values. Through debugging I can see that the metrics and distribution  are still being incremented as expected. Has the metrics interface changed at all? This is currently how we querying the metrics.


    PipelineResult result = p.run();
    for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) {
               LOGGER.info<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" + r.getName() + " v=" + r.getAttempted().toString());
    }
    for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions()) {
               LOGGER.info<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" + r.getName() + " v=" + r.getAttempted().toString());
    }
    result.waitUntilFinish();



Thanks,
Kyle


Re: Querying Metrics when using Spark Runner

Posted by Alexey Romanenko <ar...@gmail.com>.
Hmmm, the only thing that comes to my mind is that we started to use Spark AccumulatorsV2 API [1] but it was released just right in Beam 2.13.0
There is a related issue with this upgrade [2] but seems it’s not related.

[1] https://issues.apache.org/jira/browse/BEAM-4552 <https://issues.apache.org/jira/browse/BEAM-4552>
[2] https://issues.apache.org/jira/browse/BEAM-10294 <https://issues.apache.org/jira/browse/BEAM-10294>

> On 2 Jul 2020, at 22:07, Truebody, Kyle <Tr...@DNB.com> wrote:
> 
> Hi, 
>  
> We have recently upgraded to the latest version of Apache Beam 2.22.0. We were previously using version 2.13.0 .
>  
> We are using the SparkRunner.
> 
> I noticed that after the upgrade that the Metrics query has stop producing values. Through debugging I can see that the metrics and distribution  are still being incremented as expected. Has the metrics interface changed at all? This is currently how we querying the metrics.
> 
> 
>     PipelineResult result = p.run();
>     for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) {
>                LOGGER.info <http://logger.info/>("n=" + r.getName() + " v=" + r.getAttempted().toString());
>     }
>     for(MetricResult r: result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions()) {
>                LOGGER.info <http://logger.info/>("n=" + r.getName() + " v=" + r.getAttempted().toString());
>     }
>     result.waitUntilFinish();
> 
>  
> Thanks,
> Kyle