You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Antonio Martínez Carratalá <am...@alto-analytics.com> on 2020/03/12 08:43:30 UTC

Flink gelly dependency in transient EMR cluster

Hello,

I'm trying to run a flink job that works with graphs in a transient cluster
in EMR, here is my code:

----------
    HadoopJarStepConfig copyJarStepConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("bash", "-c", "aws s3 cp s3://" + BUCKET_NAME +
"/pugore-flink.jar /home/hadoop/pugore-flink.jar");

    StepConfig copyJarStep = new StepConfig()
            .withName("Copy Jar")
            .withHadoopJarStep(copyJarStepConf);

    HadoopJarStepConfig flinkJobConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("bash", "-c", "flink run -m yarn-cluster -yn 1"
                    + " --class
es.pugore.flink.job.centrality.CentralityJob /home/hadoop/pugore-flink.jar"
                    + " --alpha 0.05"
                    + " --iterations 50"
                    + " --input s3://" + BUCKET_NAME + "/" + key +
"/edges.csv"
                    + " --output s3://" + BUCKET_NAME + "/" + key +
"/vertices-centrality.csv");

    StepConfig flinkRunJobStep = new StepConfig()
            .withName("Flink job")
            .withActionOnFailure("CONTINUE")
            .withHadoopJarStep(flinkJobConf);

    List<StepConfig> stepConfigs = new ArrayList<>();
    stepConfigs.add(copyJarStep);
    stepConfigs.add(flinkRunJobStep);

    Application flink = new Application().withName("Flink");

    String clusterName = "flink-job-" + key;
    RunJobFlowRequest request = new RunJobFlowRequest()
            .withName(clusterName)
            .withReleaseLabel("emr-5.26.0")
            .withApplications(flink)
            .withServiceRole("EMR_DefaultRole")
            .withJobFlowRole("EMR_EC2_DefaultRole")
            .withLogUri("s3://" + BUCKET_NAME + "/" + key + "/logs")
            .withInstances(new JobFlowInstancesConfig()
                    .withInstanceCount(2)
                    .withKeepJobFlowAliveWhenNoSteps(false)
                    .withMasterInstanceType("m4.large")
                    .withSlaveInstanceType("m4.large"))
            .withSteps(stepConfigs);

    RunJobFlowResult result = getEmrClient().runJobFlow(request);
    String clusterId = result.getJobFlowId();

    log.debug("[" + key + "] cluster created with id: " + clusterId);
-------------------------

This job creates the cluster from scratch and launches my job, it is
executed but I'm getting the following error:

Caused by: java.lang.NoClassDefFoundError:
org/apache/flink/graph/GraphAlgorithm

In my local cluster I copy the flink-gelly jar from flink/opt to flink/lib
and it works, is there any way to do it automatically in a transient EMR
cluster before launching the job?

I know I can put the jar in S3 and copy it from there as I do with my jar
in the first step and then use it as classpath, but I'm wondering if it is
possible to instruct EMR to include that dependency in some way, maybe with
some option in Application, Configuration,  BootstrapAction or any other...
since it is a Flink dependency

Thank you

Re: Flink gelly dependency in transient EMR cluster

Posted by Till Rohrmann <tr...@apache.org>.
Alternatively, you could also bundle the Gelly dependency with your user
code jar by creating an uber jar. The downside of this approach would be an
increased jar size which needs to be uploaded to the cluster.

Cheers,
Till

On Thu, Mar 12, 2020 at 4:13 PM Antonio Martínez Carratalá <
amartinez@alto-analytics.com> wrote:

> I reply to myself with the solution in case someone else is having the
> same question
>
> It is only needed to add a copy command to copy the jar from flink/opt to
> flink/lib, in my case:
>
> StepConfig addGellyStep = new StepConfig()
>         .withName("add-gelly-step")
>         .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>                 .withArgs("bash", "-c", "sudo cp
> /usr/lib/flink/opt/flink-gelly_2.11-1.8.0.jar /usr/lib/flink/lib"));
>
>
>
> On Thu, Mar 12, 2020 at 9:43 AM Antonio Martínez Carratalá <
> amartinez@alto-analytics.com> wrote:
>
>> Hello,
>>
>> I'm trying to run a flink job that works with graphs in a transient
>> cluster in EMR, here is my code:
>>
>> ----------
>>     HadoopJarStepConfig copyJarStepConf = new HadoopJarStepConfig()
>>             .withJar("command-runner.jar")
>>             .withArgs("bash", "-c", "aws s3 cp s3://" + BUCKET_NAME +
>> "/pugore-flink.jar /home/hadoop/pugore-flink.jar");
>>
>>     StepConfig copyJarStep = new StepConfig()
>>             .withName("Copy Jar")
>>             .withHadoopJarStep(copyJarStepConf);
>>
>>     HadoopJarStepConfig flinkJobConf = new HadoopJarStepConfig()
>>             .withJar("command-runner.jar")
>>             .withArgs("bash", "-c", "flink run -m yarn-cluster -yn 1"
>>                     + " --class
>> es.pugore.flink.job.centrality.CentralityJob /home/hadoop/pugore-flink.jar"
>>                     + " --alpha 0.05"
>>                     + " --iterations 50"
>>                     + " --input s3://" + BUCKET_NAME + "/" + key +
>> "/edges.csv"
>>                     + " --output s3://" + BUCKET_NAME + "/" + key +
>> "/vertices-centrality.csv");
>>
>>     StepConfig flinkRunJobStep = new StepConfig()
>>             .withName("Flink job")
>>             .withActionOnFailure("CONTINUE")
>>             .withHadoopJarStep(flinkJobConf);
>>
>>     List<StepConfig> stepConfigs = new ArrayList<>();
>>     stepConfigs.add(copyJarStep);
>>     stepConfigs.add(flinkRunJobStep);
>>
>>     Application flink = new Application().withName("Flink");
>>
>>     String clusterName = "flink-job-" + key;
>>     RunJobFlowRequest request = new RunJobFlowRequest()
>>             .withName(clusterName)
>>             .withReleaseLabel("emr-5.26.0")
>>             .withApplications(flink)
>>             .withServiceRole("EMR_DefaultRole")
>>             .withJobFlowRole("EMR_EC2_DefaultRole")
>>             .withLogUri("s3://" + BUCKET_NAME + "/" + key + "/logs")
>>             .withInstances(new JobFlowInstancesConfig()
>>                     .withInstanceCount(2)
>>                     .withKeepJobFlowAliveWhenNoSteps(false)
>>                     .withMasterInstanceType("m4.large")
>>                     .withSlaveInstanceType("m4.large"))
>>             .withSteps(stepConfigs);
>>
>>     RunJobFlowResult result = getEmrClient().runJobFlow(request);
>>     String clusterId = result.getJobFlowId();
>>
>>     log.debug("[" + key + "] cluster created with id: " + clusterId);
>> -------------------------
>>
>> This job creates the cluster from scratch and launches my job, it is
>> executed but I'm getting the following error:
>>
>> Caused by: java.lang.NoClassDefFoundError:
>> org/apache/flink/graph/GraphAlgorithm
>>
>> In my local cluster I copy the flink-gelly jar from flink/opt to
>> flink/lib and it works, is there any way to do it automatically in a
>> transient EMR cluster before launching the job?
>>
>> I know I can put the jar in S3 and copy it from there as I do with my jar
>> in the first step and then use it as classpath, but I'm wondering if it is
>> possible to instruct EMR to include that dependency in some way, maybe with
>> some option in Application, Configuration,  BootstrapAction or any other...
>> since it is a Flink dependency
>>
>> Thank you
>>
>>
>>
>>
>>
>

Re: Flink gelly dependency in transient EMR cluster

Posted by Antonio Martínez Carratalá <am...@alto-analytics.com>.
I reply to myself with the solution in case someone else is having the same
question

It is only needed to add a copy command to copy the jar from flink/opt to
flink/lib, in my case:

StepConfig addGellyStep = new StepConfig()
        .withName("add-gelly-step")
        .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
                .withArgs("bash", "-c", "sudo cp
/usr/lib/flink/opt/flink-gelly_2.11-1.8.0.jar /usr/lib/flink/lib"));



On Thu, Mar 12, 2020 at 9:43 AM Antonio Martínez Carratalá <
amartinez@alto-analytics.com> wrote:

> Hello,
>
> I'm trying to run a flink job that works with graphs in a transient
> cluster in EMR, here is my code:
>
> ----------
>     HadoopJarStepConfig copyJarStepConf = new HadoopJarStepConfig()
>             .withJar("command-runner.jar")
>             .withArgs("bash", "-c", "aws s3 cp s3://" + BUCKET_NAME +
> "/pugore-flink.jar /home/hadoop/pugore-flink.jar");
>
>     StepConfig copyJarStep = new StepConfig()
>             .withName("Copy Jar")
>             .withHadoopJarStep(copyJarStepConf);
>
>     HadoopJarStepConfig flinkJobConf = new HadoopJarStepConfig()
>             .withJar("command-runner.jar")
>             .withArgs("bash", "-c", "flink run -m yarn-cluster -yn 1"
>                     + " --class
> es.pugore.flink.job.centrality.CentralityJob /home/hadoop/pugore-flink.jar"
>                     + " --alpha 0.05"
>                     + " --iterations 50"
>                     + " --input s3://" + BUCKET_NAME + "/" + key +
> "/edges.csv"
>                     + " --output s3://" + BUCKET_NAME + "/" + key +
> "/vertices-centrality.csv");
>
>     StepConfig flinkRunJobStep = new StepConfig()
>             .withName("Flink job")
>             .withActionOnFailure("CONTINUE")
>             .withHadoopJarStep(flinkJobConf);
>
>     List<StepConfig> stepConfigs = new ArrayList<>();
>     stepConfigs.add(copyJarStep);
>     stepConfigs.add(flinkRunJobStep);
>
>     Application flink = new Application().withName("Flink");
>
>     String clusterName = "flink-job-" + key;
>     RunJobFlowRequest request = new RunJobFlowRequest()
>             .withName(clusterName)
>             .withReleaseLabel("emr-5.26.0")
>             .withApplications(flink)
>             .withServiceRole("EMR_DefaultRole")
>             .withJobFlowRole("EMR_EC2_DefaultRole")
>             .withLogUri("s3://" + BUCKET_NAME + "/" + key + "/logs")
>             .withInstances(new JobFlowInstancesConfig()
>                     .withInstanceCount(2)
>                     .withKeepJobFlowAliveWhenNoSteps(false)
>                     .withMasterInstanceType("m4.large")
>                     .withSlaveInstanceType("m4.large"))
>             .withSteps(stepConfigs);
>
>     RunJobFlowResult result = getEmrClient().runJobFlow(request);
>     String clusterId = result.getJobFlowId();
>
>     log.debug("[" + key + "] cluster created with id: " + clusterId);
> -------------------------
>
> This job creates the cluster from scratch and launches my job, it is
> executed but I'm getting the following error:
>
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/graph/GraphAlgorithm
>
> In my local cluster I copy the flink-gelly jar from flink/opt to flink/lib
> and it works, is there any way to do it automatically in a transient EMR
> cluster before launching the job?
>
> I know I can put the jar in S3 and copy it from there as I do with my jar
> in the first step and then use it as classpath, but I'm wondering if it is
> possible to instruct EMR to include that dependency in some way, maybe with
> some option in Application, Configuration,  BootstrapAction or any other...
> since it is a Flink dependency
>
> Thank you
>
>
>
>
>