You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jack Tuck <ja...@invinsec.com> on 2019/03/06 19:39:10 UTC

How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus.

I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job).  Out the box, it does not look like EMR's Flink distribution includes the optional jars (flink-metrics-prometheus, flink-cep, etc).

Looking at Flink's documentation, it says
> "In order to use this reporter you must copy `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your Flink distribution"
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink has a directory called `opts` and i can not see `flink-metrics-prometheus-1.6.1.jar` anywhere.

I know Flink has other optional libs you'd usually have to copy if you want to use them such as flink-cep, but I'm not sure how to do this when using EMR.

This is the exception i get, which I beleive is because it can not find the metrics jar in its classpath.
```
java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
                at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
                at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
                at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
                at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
                at java.lang.Class.forName0(Native Method)
                at java.lang.Class.forName(Class.java:264)
                at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:144)
                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)
                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)
                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)
                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)
                at java.security.AccessController.doPrivileged(Native Method)
                at javax.security.auth.Subject.doAs(Subject.java:422)
                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
                at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
                at org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)
```

EMR resource in terraform
```resource "aws_emr_cluster" "emr_flink" {
  name          = "ce-emr-flink-arn"
  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing
  applications  = ["Flink"]

  ec2_attributes {
    key_name                          = "ce_test"
    subnet_id                         = "${aws_subnet.ce_test_subnet_public.id}"
    instance_profile                  = "${aws_iam_instance_profile.emr_profile.arn}"
    emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"
    emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"
    additional_master_security_groups  = "${aws_security_group.external_connectivity.id}"
    additional_slave_security_groups  = "${aws_security_group.external_connectivity.id}"
  }

  ebs_root_volume_size = 100
  master_instance_type = "m4.xlarge"
  core_instance_type   = "m4.xlarge"
  core_instance_count  = 2

  service_role = "${aws_iam_role.iam_emr_service_role.arn}"

  configurations_json = <<EOF
[
  {
    "Classification": "flink-conf",
    "Properties": {
        "parallelism.default": "8",
        "state.backend": "RocksDB",
        "state.backend.async": "true",
        "state.backend.incremental": "true",
        "state.savepoints.dir": "file:///savepoints",
        "state.checkpoints.dir": "file:///checkpoints",
        "web.submit.enable": "true",
        "metrics.reporter.promgateway.class": "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter",
        "metrics.reporter.promgateway.host": "${aws_instance.monitoring.private_ip}",
        "metrics.reporter.promgateway.port": "9091",
        "metrics.reporter.promgateway.jobName": "ce-test",
        "metrics.reporter.promgateway.randomJobNameSuffix": "true",
        "metrics.reporter.promgateway.deleteOnShutdown": "false"
    }
  }
]
EOF
}
```

I suspect i may have to download the Jar in the bootstrap stage, but wanted to check this first, and see if there's any examples of this being done

How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

Posted by Jack Tuck <ja...@invinsec.com>.
I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus.

I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job).  Out the box, it does not look like EMR's Flink distribution includes the optional jars (flink-metrics-prometheus, flink-cep, etc).

Looking at Flink's documentation, it says
> "In order to use this reporter you must copy `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your Flink distribution"
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink has a directory called `opts` and i can not see `flink-metrics-prometheus-1.6.1.jar` anywhere.

I know Flink has other optional libs you'd usually have to copy if you want to use them such as flink-cep, but I'm not sure how to do this when using EMR.

This is the exception i get, which I beleive is because it can not find the metrics jar in its classpath.
```
java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
                at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
                at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
                at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
                at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
                at java.lang.Class.forName0(Native Method)
                at java.lang.Class.forName(Class.java:264)
                at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:144)
                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)
                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)
                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)
                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)
                at java.security.AccessController.doPrivileged(Native Method)
                at javax.security.auth.Subject.doAs(Subject.java:422)
                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
                at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
                at org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)
```

EMR resource in terraform
```resource "aws_emr_cluster" "emr_flink" {
  name          = "ce-emr-flink-arn"
  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing
  applications  = ["Flink"]

  ec2_attributes {
    key_name                          = "ce_test"
    subnet_id                         = "${aws_subnet.ce_test_subnet_public.id}"
    instance_profile                  = "${aws_iam_instance_profile.emr_profile.arn}"
    emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"
    emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"
    additional_master_security_groups  = "${aws_security_group.external_connectivity.id}"
    additional_slave_security_groups  = "${aws_security_group.external_connectivity.id}"
  }

  ebs_root_volume_size = 100
  master_instance_type = "m4.xlarge"
  core_instance_type   = "m4.xlarge"
  core_instance_count  = 2

  service_role = "${aws_iam_role.iam_emr_service_role.arn}"

  configurations_json = <<EOF
[
  {
    "Classification": "flink-conf",
    "Properties": {
        "parallelism.default": "8",
        "state.backend": "RocksDB",
        "state.backend.async": "true",
        "state.backend.incremental": "true",
        "state.savepoints.dir": "file:///savepoints",
        "state.checkpoints.dir": "file:///checkpoints",
        "web.submit.enable": "true",
        "metrics.reporter.promgateway.class": "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter",
        "metrics.reporter.promgateway.host": "${aws_instance.monitoring.private_ip}",
        "metrics.reporter.promgateway.port": "9091",
        "metrics.reporter.promgateway.jobName": "ce-test",
        "metrics.reporter.promgateway.randomJobNameSuffix": "true",
        "metrics.reporter.promgateway.deleteOnShutdown": "false"
    }
  }
]
EOF
}
```

I suspect i may have to download the Jar in the bootstrap stage, but wanted to check this first, and see if there's any examples of this being done

Re: How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

Posted by Yun Tang <my...@live.com>.
Hi Jack

How about extracting flink-metrics-prometheus-1.6.1.jar from downloaded distribution tar https://archive.apache.org/dist/flink/flink-1.6.1/ and upload it to `/usr/lib/flink/lib` on EMR?

Otherwise, I believe setup a customized Flink cluster on EMR [1] should work if no other convenient solutions.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#custom-emr-installation


Best
Yun Tang
________________________________
From: Jack Tuck <ja...@invinsec.com>
Sent: Thursday, March 7, 2019 3:39
To: user@flink.apache.org
Subject: How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?


I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus.



I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job).  Out the box, it does not look like EMR's Flink distribution includes the optional jars (flink-metrics-prometheus, flink-cep, etc).



Looking at Flink's documentation, it says

> "In order to use this reporter you must copy `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your Flink distribution"

https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter



But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink has a directory called `opts` and i can not see `flink-metrics-prometheus-1.6.1.jar` anywhere.



I know Flink has other optional libs you'd usually have to copy if you want to use them such as flink-cep, but I'm not sure how to do this when using EMR.



This is the exception i get, which I beleive is because it can not find the metrics jar in its classpath.

```

java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

                at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

                at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

                at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

                at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

                at java.lang.Class.forName0(Native Method)

                at java.lang.Class.forName(Class.java:264)

                at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:144)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)

                at java.security.AccessController.doPrivileged(Native Method)

                at javax.security.auth.Subject.doAs(Subject.java:422)

                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)

                at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

                at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)

                at org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)

```



EMR resource in terraform

```resource "aws_emr_cluster" "emr_flink" {

  name          = "ce-emr-flink-arn"

  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing

  applications  = ["Flink"]



  ec2_attributes {

    key_name                          = "ce_test"

    subnet_id                         = "${aws_subnet.ce_test_subnet_public.id}"

    instance_profile                  = "${aws_iam_instance_profile.emr_profile.arn}"

    emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"

    emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"

    additional_master_security_groups  = "${aws_security_group.external_connectivity.id}"

    additional_slave_security_groups  = "${aws_security_group.external_connectivity.id}"

  }



  ebs_root_volume_size = 100

  master_instance_type = "m4.xlarge"

  core_instance_type   = "m4.xlarge"

  core_instance_count  = 2



  service_role = "${aws_iam_role.iam_emr_service_role.arn}"



  configurations_json = <<EOF

[

  {

    "Classification": "flink-conf",

    "Properties": {

        "parallelism.default": "8",

        "state.backend": "RocksDB",

        "state.backend.async": "true",

        "state.backend.incremental": "true",

        "state.savepoints.dir": "file:///savepoints",

        "state.checkpoints.dir": "file:///checkpoints",

        "web.submit.enable": "true",

        "metrics.reporter.promgateway.class": "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter",

        "metrics.reporter.promgateway.host": "${aws_instance.monitoring.private_ip}",

        "metrics.reporter.promgateway.port": "9091",

        "metrics.reporter.promgateway.jobName": "ce-test",

        "metrics.reporter.promgateway.randomJobNameSuffix": "true",

        "metrics.reporter.promgateway.deleteOnShutdown": "false"

    }

  }

]

EOF

}

```



I suspect i may have to download the Jar in the bootstrap stage, but wanted to check this first, and see if there's any examples of this being done