You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/08/18 17:07:26 UTC

[jira] [Commented] (BEAM-2056) Add tests for exporting Beam Metrics to Flink Metrics

    [ https://issues.apache.org/jira/browse/BEAM-2056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17179893#comment-17179893 ] 

Beam JIRA Bot commented on BEAM-2056:
-------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.


> Add tests for exporting Beam Metrics to Flink Metrics
> -----------------------------------------------------
>
>                 Key: BEAM-2056
>                 URL: https://issues.apache.org/jira/browse/BEAM-2056
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Priority: P2
>              Labels: stale-P2
>
> There are currently no tests that verify that metrics that are reported using the Beam Metrics API are forwarded to Flink and a {{MetricReporter}}.
> A test for this would have to manually configure a Flink "Mini Cluster", as in
> {code}
> // start also a re-usable Flink mini cluster
> flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
> flink.start();
> flinkPort = flink.getLeaderRPCPort();
> {code}
> with {{getFlinkConfiguration()}}:
> {code}
> protected static Configuration getFlinkConfiguration() {
>     Configuration flinkConfig = new Configuration();
>     flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
>     flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
>     flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
>     flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
>     flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
>     flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, MyTestReporter.class.getName());
>     return flinkConfig;
> }
> {code}
> where {{MyTestReporter}} is a {{MetricReporter}} that stores metrics being reported to it so we can verify that they are there after the job finishes.
> Running a Pipeline on the mini cluster should be possible by specifying "localhost" and the port we received as a cluster endpoint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)