You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Theofilos Kakantousis <tk...@kth.se> on 2016/04/19 14:37:10 UTC

Flink on Yarn - ApplicationMaster command

Hi everyone,

I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that 
submits a flink application to Yarn. To keep it simple I use the 
ConnectedComponents app from flink examples.

I set the required properties (Resources, AM ContainerLaunchContext 
etc.) on the YARN client interface. What happens is the JobManager and 
TaskManager processes start and based on the logs containers are running 
but the actual application does not start. I'm probably missing the 
proper way to pass parameters to the ApplicationMaster and it cannot 
pick up the application it needs to run. Anyone knows where I could get 
some info on how to pass runtime params to the AppMaster?

The ApplicationMaster launchcontainer script includes the following:
exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M 
org.apache.flink.yarn.ApplicationMaster  -c 
org.apache.flink.examples.java.graph.ConnectedComponents 1> 
/tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "

Thank you,
Theofilos


Re: Flink on Yarn - ApplicationMaster command

Posted by Maximilian Michels <mx...@apache.org>.
Great to hear! :)

On Sun, Apr 24, 2016 at 3:51 PM, Theofilos Kakantousis <tk...@kth.se> wrote:
> Hi,
>
> The issue was a mismatch of jar versions on my client. Seems to be working
> fine now.
> Thanks again for your help!
>
> Cheers,
> Theofilos
>
>
> On 2016-04-22 18:22, Theofilos Kakantousis wrote:
>
> Hi Max,
>
> I manage to get the jobManagerAddress from FlinkYarnCluster, however when I
> submit a job using the code below the jobID is null.
> Is there something wrong in the way I submit the job? Otherwise any ideas to
> which direction should I further investigate?
>
> The runBlocking call returns almost immediately. There is no indication the
> job reaches the JobManager as the last log entries for the jobmanager and
> taskmanager logs are that the processes have started successfully.
>
>
> String[] args = {""};
> File file = new File("/srv/flink/examples/ConnectedComponents.jar");
> int parallelism = 1;
> InetSocketAddress jobManagerAddress = cluster.getJobManagerAddress();
> org.apache.flink.configuration.Configuration clientConf = new
> org.apache.flink.configuration.Configuration();
> clientConf.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
> jobManagerAddress.getPort());
> clientConf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
> jobManagerAddress.getHostName());
> Client client = new Client(clientConf);
> try {
>     PackagedProgram program = new PackagedProgram(file,
> "org.apache.flink.examples.java.graph.ConnectedComponents", args);
>     client.setPrintStatusDuringExecution(true);
>     JobSubmissionResult jobRes = client.runBlocking(program, parallelism);
>     JobID jobID = jobRes.getJobID();
> } catch (ProgramInvocationException ex) {
>     Logger.getLogger(YarnRunner.class.getName()).log(Level.SEVERE, null,
> }
>
>
> Thanks,
> Theofilos
>
>
> On 2016-04-22 16:05, Maximilian Michels wrote:
>
> Hi Theofilos,
>
> Assuming you have the FlinkYarnCluster after the call to deploy(). You
> can get the JobManager address using the
>
> InetSocketAddress address = cluster.getJobManagerAddress();
>
> Then create a Configuration with this address:
>
> Configuration config = new Configuration();
> config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
> address.getHostName());
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
> address.getPort());
>
> Then the client:
>
> Client client = new Client(config);
>
> Then use it to submit jobs blocking/detached, e.g.
>
> client.runBlocking(...);
> client.runDetached(...);
>
> Cheers,
> Max
>
> On Tue, Apr 19, 2016 at 11:25 PM, Theofilos Kakantousis <tk...@kth.se> wrote:
>
> Hi Max,
>
> Thank you for your reply. Exactly, I want to setup the Yarn cluster and
> submit a job through code and not using cmd client.
> I had done what you suggested, I used part of the deploy method to write my
> own code that starts up the cluster which seems to be working fine.
>
> Could you point me to some examples how to use the Client you mention?
>
> Cheers,
> Theofilos
>
>
> On 2016-04-19 16:35, Maximilian Michels wrote:
>
> Hi Theofilos,
>
> I'm not sure whether I understand correctly what you are trying to do.
> I'm assuming you don't want to use the command-line client.
>
> You can setup the Yarn cluster in your code manually using the
> FlinkYarnClient class. The deploy() method will give you a
> FlinkYarnCluster which you can use to connect to the deployed cluster.
> Then get the JobManager address and use the Client class to submit
> Flink jobs to the cluster. I have to warn you that these classes are
> subject to change in Flink 1.1.0 and above.
>
> Let me know if the procedure works for you.
>
> Cheers,
> Max
>
> On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis <tk...@kth.se>
> wrote:
>
> Hi everyone,
>
> I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that
> submits a
> flink application to Yarn. To keep it simple I use the
> ConnectedComponents
> app from flink examples.
>
> I set the required properties (Resources, AM ContainerLaunchContext etc.)
> on
> the YARN client interface. What happens is the JobManager and TaskManager
> processes start and based on the logs containers are running but the
> actual
> application does not start. I'm probably missing the proper way to pass
> parameters to the ApplicationMaster and it cannot pick up the application
> it
> needs to run. Anyone knows where I could get some info on how to pass
> runtime params to the AppMaster?
>
> The ApplicationMaster launchcontainer script includes the following:
> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
> org.apache.flink.yarn.ApplicationMaster  -c
> org.apache.flink.examples.java.graph.ConnectedComponents 1>
> /tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "
>
> Thank you,
> Theofilos
>
>
>

Re: Flink on Yarn - ApplicationMaster command

Posted by Theofilos Kakantousis <tk...@kth.se>.
Hi,

The issue was a mismatch of jar versions on my client. Seems to be 
working fine now.
Thanks again for your help!

Cheers,
Theofilos


On 2016-04-22 18:22, Theofilos Kakantousis wrote:
> Hi Max,
>
> I manage to get the jobManagerAddress from FlinkYarnCluster, however 
> when I submit a job using the code below the jobID is null.
> Is there something wrong in the way I submit the job? Otherwise any 
> ideas to which direction should I further investigate?
>
> The /runBlocking /call returns almost immediately. There is no 
> indication the job reaches the JobManager as the last log entries for 
> the jobmanager and taskmanager logs are that the processes have 
> started successfully.
>
>
> String[] args = {""};
> File file = new File("/srv/flink/examples/ConnectedComponents.jar");
> int parallelism = 1;
> InetSocketAddress jobManagerAddress = cluster.getJobManagerAddress();
> org.apache.flink.configuration.Configuration clientConf = new 
> org.apache.flink.configuration.Configuration();
> clientConf.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
> jobManagerAddress.getPort());
> clientConf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
> jobManagerAddress.getHostName());
> Client client = new Client(clientConf);
> try {
>     PackagedProgram program = new PackagedProgram(file, 
> "org.apache.flink.examples.java.graph.ConnectedComponents", args);
>     client.setPrintStatusDuringExecution(true);
>     JobSubmissionResult jobRes = client.runBlocking(program, parallelism);
>     JobID jobID = jobRes.getJobID();
> } catch (ProgramInvocationException ex) {
> Logger.getLogger(YarnRunner.class.getName()).log(Level.SEVERE, null,
> }
>
>
> Thanks,
> Theofilos
>
>
> On 2016-04-22 16:05, Maximilian Michels wrote:
>> Hi Theofilos,
>>
>> Assuming you have the FlinkYarnCluster after the call to deploy(). You
>> can get the JobManager address using the
>>
>> InetSocketAddress address = cluster.getJobManagerAddress();
>>
>> Then create a Configuration with this address:
>>
>> Configuration config = new Configuration();
>> config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
>> address.getHostName());
>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
>>
>> Then the client:
>>
>> Client client = new Client(config);
>>
>> Then use it to submit jobs blocking/detached, e.g.
>>
>> client.runBlocking(...);
>> client.runDetached(...);
>>
>> Cheers,
>> Max
>>
>> On Tue, Apr 19, 2016 at 11:25 PM, Theofilos Kakantousis<tk...@kth.se>  wrote:
>>> Hi Max,
>>>
>>> Thank you for your reply. Exactly, I want to setup the Yarn cluster and
>>> submit a job through code and not using cmd client.
>>> I had done what you suggested, I used part of the deploy method to write my
>>> own code that starts up the cluster which seems to be working fine.
>>>
>>> Could you point me to some examples how to use the Client you mention?
>>>
>>> Cheers,
>>> Theofilos
>>>
>>>
>>> On 2016-04-19 16:35, Maximilian Michels wrote:
>>>> Hi Theofilos,
>>>>
>>>> I'm not sure whether I understand correctly what you are trying to do.
>>>> I'm assuming you don't want to use the command-line client.
>>>>
>>>> You can setup the Yarn cluster in your code manually using the
>>>> FlinkYarnClient class. The deploy() method will give you a
>>>> FlinkYarnCluster which you can use to connect to the deployed cluster.
>>>> Then get the JobManager address and use the Client class to submit
>>>> Flink jobs to the cluster. I have to warn you that these classes are
>>>> subject to change in Flink 1.1.0 and above.
>>>>
>>>> Let me know if the procedure works for you.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis<tk...@kth.se>
>>>> wrote:
>>>>> Hi everyone,
>>>>>
>>>>> I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that
>>>>> submits a
>>>>> flink application to Yarn. To keep it simple I use the
>>>>> ConnectedComponents
>>>>> app from flink examples.
>>>>>
>>>>> I set the required properties (Resources, AM ContainerLaunchContext etc.)
>>>>> on
>>>>> the YARN client interface. What happens is the JobManager and TaskManager
>>>>> processes start and based on the logs containers are running but the
>>>>> actual
>>>>> application does not start. I'm probably missing the proper way to pass
>>>>> parameters to the ApplicationMaster and it cannot pick up the application
>>>>> it
>>>>> needs to run. Anyone knows where I could get some info on how to pass
>>>>> runtime params to the AppMaster?
>>>>>
>>>>> The ApplicationMaster launchcontainer script includes the following:
>>>>> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
>>>>> org.apache.flink.yarn.ApplicationMaster  -c
>>>>> org.apache.flink.examples.java.graph.ConnectedComponents 1>
>>>>> /tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "
>>>>>
>>>>> Thank you,
>>>>> Theofilos
>>>>>
>


Re: Flink on Yarn - ApplicationMaster command

Posted by Theofilos Kakantousis <tk...@kth.se>.
Hi Max,

I manage to get the jobManagerAddress from FlinkYarnCluster, however 
when I submit a job using the code below the jobID is null.
Is there something wrong in the way I submit the job? Otherwise any 
ideas to which direction should I further investigate?

The /runBlocking /call returns almost immediately. There is no 
indication the job reaches the JobManager as the last log entries for 
the jobmanager and taskmanager logs are that the processes have started 
successfully.


String[] args = {""};
File file = new File("/srv/flink/examples/ConnectedComponents.jar");
int parallelism = 1;
InetSocketAddress jobManagerAddress = cluster.getJobManagerAddress();
org.apache.flink.configuration.Configuration clientConf = new 
org.apache.flink.configuration.Configuration();
clientConf.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
jobManagerAddress.getPort());
clientConf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
jobManagerAddress.getHostName());
Client client = new Client(clientConf);
try {
     PackagedProgram program = new PackagedProgram(file, 
"org.apache.flink.examples.java.graph.ConnectedComponents", args);
     client.setPrintStatusDuringExecution(true);
     JobSubmissionResult jobRes = client.runBlocking(program, parallelism);
     JobID jobID = jobRes.getJobID();
} catch (ProgramInvocationException ex) {
Logger.getLogger(YarnRunner.class.getName()).log(Level.SEVERE, null,
}


Thanks,
Theofilos


On 2016-04-22 16:05, Maximilian Michels wrote:
> Hi Theofilos,
>
> Assuming you have the FlinkYarnCluster after the call to deploy(). You
> can get the JobManager address using the
>
> InetSocketAddress address = cluster.getJobManagerAddress();
>
> Then create a Configuration with this address:
>
> Configuration config = new Configuration();
> config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
> address.getHostName());
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
>
> Then the client:
>
> Client client = new Client(config);
>
> Then use it to submit jobs blocking/detached, e.g.
>
> client.runBlocking(...);
> client.runDetached(...);
>
> Cheers,
> Max
>
> On Tue, Apr 19, 2016 at 11:25 PM, Theofilos Kakantousis <tk...@kth.se> wrote:
>> Hi Max,
>>
>> Thank you for your reply. Exactly, I want to setup the Yarn cluster and
>> submit a job through code and not using cmd client.
>> I had done what you suggested, I used part of the deploy method to write my
>> own code that starts up the cluster which seems to be working fine.
>>
>> Could you point me to some examples how to use the Client you mention?
>>
>> Cheers,
>> Theofilos
>>
>>
>> On 2016-04-19 16:35, Maximilian Michels wrote:
>>> Hi Theofilos,
>>>
>>> I'm not sure whether I understand correctly what you are trying to do.
>>> I'm assuming you don't want to use the command-line client.
>>>
>>> You can setup the Yarn cluster in your code manually using the
>>> FlinkYarnClient class. The deploy() method will give you a
>>> FlinkYarnCluster which you can use to connect to the deployed cluster.
>>> Then get the JobManager address and use the Client class to submit
>>> Flink jobs to the cluster. I have to warn you that these classes are
>>> subject to change in Flink 1.1.0 and above.
>>>
>>> Let me know if the procedure works for you.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis <tk...@kth.se>
>>> wrote:
>>>> Hi everyone,
>>>>
>>>> I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that
>>>> submits a
>>>> flink application to Yarn. To keep it simple I use the
>>>> ConnectedComponents
>>>> app from flink examples.
>>>>
>>>> I set the required properties (Resources, AM ContainerLaunchContext etc.)
>>>> on
>>>> the YARN client interface. What happens is the JobManager and TaskManager
>>>> processes start and based on the logs containers are running but the
>>>> actual
>>>> application does not start. I'm probably missing the proper way to pass
>>>> parameters to the ApplicationMaster and it cannot pick up the application
>>>> it
>>>> needs to run. Anyone knows where I could get some info on how to pass
>>>> runtime params to the AppMaster?
>>>>
>>>> The ApplicationMaster launchcontainer script includes the following:
>>>> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
>>>> org.apache.flink.yarn.ApplicationMaster  -c
>>>> org.apache.flink.examples.java.graph.ConnectedComponents 1>
>>>> /tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "
>>>>
>>>> Thank you,
>>>> Theofilos
>>>>


Re: Flink on Yarn - ApplicationMaster command

Posted by Maximilian Michels <mx...@apache.org>.
Hi Theofilos,

Assuming you have the FlinkYarnCluster after the call to deploy(). You
can get the JobManager address using the

InetSocketAddress address = cluster.getJobManagerAddress();

Then create a Configuration with this address:

Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
address.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());

Then the client:

Client client = new Client(config);

Then use it to submit jobs blocking/detached, e.g.

client.runBlocking(...);
client.runDetached(...);

Cheers,
Max

On Tue, Apr 19, 2016 at 11:25 PM, Theofilos Kakantousis <tk...@kth.se> wrote:
> Hi Max,
>
> Thank you for your reply. Exactly, I want to setup the Yarn cluster and
> submit a job through code and not using cmd client.
> I had done what you suggested, I used part of the deploy method to write my
> own code that starts up the cluster which seems to be working fine.
>
> Could you point me to some examples how to use the Client you mention?
>
> Cheers,
> Theofilos
>
>
> On 2016-04-19 16:35, Maximilian Michels wrote:
>>
>> Hi Theofilos,
>>
>> I'm not sure whether I understand correctly what you are trying to do.
>> I'm assuming you don't want to use the command-line client.
>>
>> You can setup the Yarn cluster in your code manually using the
>> FlinkYarnClient class. The deploy() method will give you a
>> FlinkYarnCluster which you can use to connect to the deployed cluster.
>> Then get the JobManager address and use the Client class to submit
>> Flink jobs to the cluster. I have to warn you that these classes are
>> subject to change in Flink 1.1.0 and above.
>>
>> Let me know if the procedure works for you.
>>
>> Cheers,
>> Max
>>
>> On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis <tk...@kth.se>
>> wrote:
>>>
>>> Hi everyone,
>>>
>>> I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that
>>> submits a
>>> flink application to Yarn. To keep it simple I use the
>>> ConnectedComponents
>>> app from flink examples.
>>>
>>> I set the required properties (Resources, AM ContainerLaunchContext etc.)
>>> on
>>> the YARN client interface. What happens is the JobManager and TaskManager
>>> processes start and based on the logs containers are running but the
>>> actual
>>> application does not start. I'm probably missing the proper way to pass
>>> parameters to the ApplicationMaster and it cannot pick up the application
>>> it
>>> needs to run. Anyone knows where I could get some info on how to pass
>>> runtime params to the AppMaster?
>>>
>>> The ApplicationMaster launchcontainer script includes the following:
>>> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
>>> org.apache.flink.yarn.ApplicationMaster  -c
>>> org.apache.flink.examples.java.graph.ConnectedComponents 1>
>>> /tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "
>>>
>>> Thank you,
>>> Theofilos
>>>
>

Re: Flink on Yarn - ApplicationMaster command

Posted by Theofilos Kakantousis <tk...@kth.se>.
Hi Max,

Thank you for your reply. Exactly, I want to setup the Yarn cluster and 
submit a job through code and not using cmd client.
I had done what you suggested, I used part of the deploy method to write 
my own code that starts up the cluster which seems to be working fine.

Could you point me to some examples how to use the Client you mention?

Cheers,
Theofilos

On 2016-04-19 16:35, Maximilian Michels wrote:
> Hi Theofilos,
>
> I'm not sure whether I understand correctly what you are trying to do.
> I'm assuming you don't want to use the command-line client.
>
> You can setup the Yarn cluster in your code manually using the
> FlinkYarnClient class. The deploy() method will give you a
> FlinkYarnCluster which you can use to connect to the deployed cluster.
> Then get the JobManager address and use the Client class to submit
> Flink jobs to the cluster. I have to warn you that these classes are
> subject to change in Flink 1.1.0 and above.
>
> Let me know if the procedure works for you.
>
> Cheers,
> Max
>
> On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis <tk...@kth.se> wrote:
>> Hi everyone,
>>
>> I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that submits a
>> flink application to Yarn. To keep it simple I use the ConnectedComponents
>> app from flink examples.
>>
>> I set the required properties (Resources, AM ContainerLaunchContext etc.) on
>> the YARN client interface. What happens is the JobManager and TaskManager
>> processes start and based on the logs containers are running but the actual
>> application does not start. I'm probably missing the proper way to pass
>> parameters to the ApplicationMaster and it cannot pick up the application it
>> needs to run. Anyone knows where I could get some info on how to pass
>> runtime params to the AppMaster?
>>
>> The ApplicationMaster launchcontainer script includes the following:
>> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
>> org.apache.flink.yarn.ApplicationMaster  -c
>> org.apache.flink.examples.java.graph.ConnectedComponents 1>
>> /tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "
>>
>> Thank you,
>> Theofilos
>>


Re: Flink on Yarn - ApplicationMaster command

Posted by Maximilian Michels <mx...@apache.org>.
Hi Theofilos,

I'm not sure whether I understand correctly what you are trying to do.
I'm assuming you don't want to use the command-line client.

You can setup the Yarn cluster in your code manually using the
FlinkYarnClient class. The deploy() method will give you a
FlinkYarnCluster which you can use to connect to the deployed cluster.
Then get the JobManager address and use the Client class to submit
Flink jobs to the cluster. I have to warn you that these classes are
subject to change in Flink 1.1.0 and above.

Let me know if the procedure works for you.

Cheers,
Max

On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis <tk...@kth.se> wrote:
> Hi everyone,
>
> I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that submits a
> flink application to Yarn. To keep it simple I use the ConnectedComponents
> app from flink examples.
>
> I set the required properties (Resources, AM ContainerLaunchContext etc.) on
> the YARN client interface. What happens is the JobManager and TaskManager
> processes start and based on the logs containers are running but the actual
> application does not start. I'm probably missing the proper way to pass
> parameters to the ApplicationMaster and it cannot pick up the application it
> needs to run. Anyone knows where I could get some info on how to pass
> runtime params to the AppMaster?
>
> The ApplicationMaster launchcontainer script includes the following:
> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
> org.apache.flink.yarn.ApplicationMaster  -c
> org.apache.flink.examples.java.graph.ConnectedComponents 1>
> /tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "
>
> Thank you,
> Theofilos
>