You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Bajaj, Abhinav" <ab...@here.com> on 2016/03/08 00:25:53 UTC

Submit Flink Jobs to YARN running on AWS

Hi,

I am a newbie to Flink and trying to use it in AWS.
I have created a YARN cluster on AWS EC2 machines.
Trying to submit Flink job to the remote YARN cluster using the Flink Client running on my local machine.

The Jobmanager start successfully on the YARN container but the client is not able to connect to the Jobmanager.

Flink Client Logs -

13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient                         - YARN application has been deployed successfully.
13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start actor system.
13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start application client.
YARN cluster started
JobManager web interface address http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
Waiting until all TaskManagers have connected
13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
No status updates from the YARN cluster received so far. Waiting ...
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager.
No status updates from the YARN cluster received so far. Waiting ...

The logs of the Jobmanager contains the following -

21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]
21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]

It seems the problem is in the mismatch of the Jobmanager Akka actors system running address and the one user by the Client.
172.31.23.18 – is the internal private IP of the EC2 machine where the Jobmanager container is running.
54.35.41.12 – is the external IP of the EC2 machine, used by Flink client to submit the Job.
Because of this mismatch the messages are ignored by the Akka actor System.

Can someone please help me with this issue.
I can share the detailed logs, if required.

Thanks,
Abhi


Re: Submit Flink Jobs to YARN running on AWS

Posted by "Bajaj, Abhinav" <ab...@here.com>.
Thanks for the quick reply.

Let me describe in more detail here.
I am trying to submit a single Flink Job to YARN using the client -

./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

In my understanding, YARN allocates a container for the Jobmanager.
Jobmanager discovers the IP and started the Actor system. At this step the IP it uses is the internal IP address.

FYI, the YARN and HDFS clusters are using the public DNS in all the configs.

Is there a way, where I can set Jobmanager to use the hostname and not the IP address?

Or any other suggestions?

Thanks,
Abhi

From: <ew...@gmail.com>> on behalf of Stephan Ewen <se...@apache.org>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Wednesday, March 9, 2016 at 6:09 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Abhi!

You pretty much described it correctly: Flink binds its ports to the internal IP addresses, so you cannot send a message through the external IP addresses.

Can you see if you can configure explicitly the external IP address as the JobManager hostname, so the JobManager will bind to that specific network interface?

Stephan


On Tue, Mar 8, 2016 at 12:25 AM, Bajaj, Abhinav <ab...@here.com>> wrote:
Hi,

I am a newbie to Flink and trying to use it in AWS.
I have created a YARN cluster on AWS EC2 machines.
Trying to submit Flink job to the remote YARN cluster using the Flink Client running on my local machine.

The Jobmanager start successfully on the YARN container but the client is not able to connect to the Jobmanager.

Flink Client Logs -

13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient                         - YARN application has been deployed successfully.
13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start actor system.
13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start application client.
YARN cluster started
JobManager web interface address http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
Waiting until all TaskManagers have connected
13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
No status updates from the YARN cluster received so far. Waiting ...
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager.
No status updates from the YARN cluster received so far. Waiting ...

The logs of the Jobmanager contains the following -

21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]
21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]

It seems the problem is in the mismatch of the Jobmanager Akka actors system running address and the one user by the Client.
172.31.23.18 – is the internal private IP of the EC2 machine where the Jobmanager container is running.
54.35.41.12 – is the external IP of the EC2 machine, used by Flink client to submit the Job.
Because of this mismatch the messages are ignored by the Akka actor System.

Can someone please help me with this issue.
I can share the detailed logs, if required.

Thanks,
Abhi



Re: Submit Flink Jobs to YARN running on AWS

Posted by Stephan Ewen <se...@apache.org>.
Hi Abhi!

You pretty much described it correctly: Flink binds its ports to the
internal IP addresses, so you cannot send a message through the external IP
addresses.

Can you see if you can configure explicitly the external IP address as the
JobManager hostname, so the JobManager will bind to that specific network
interface?

Stephan


On Tue, Mar 8, 2016 at 12:25 AM, Bajaj, Abhinav <ab...@here.com>
wrote:

> Hi,
>
> I am a newbie to Flink and trying to use it in AWS.
> I have created a YARN cluster on AWS EC2 machines.
> Trying to submit Flink job to the remote YARN cluster using the Flink
> Client running on my local machine.
>
> The Jobmanager start successfully on the YARN container but the client is
> not able to connect to the Jobmanager.
>
> Flink Client Logs -
>
> 13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - Deploying cluster, current state ACCEPTED
> 13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - Deploying cluster, current state ACCEPTED
> 13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - YARN application has been deployed successfully.
> 13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster
>        - Start actor system.
> 13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster
>        - Start application client.
> YARN cluster started
> JobManager web interface address
> http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
> Waiting until all TaskManagers have connected
> 13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient
>       - Notification about new leader address akka.tcp://flink@54.35.41.12:41292/user/jobmanager
> with session ID null.
> No status updates from the YARN cluster received so far. Waiting ...
> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>       - Received address of new leader akka.tcp://flink@54.35.41.12:41292/user/jobmanager
> with session ID null.
> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>       - Disconnect from JobManager null.
> 13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient
>       - Trying to register at JobManager akka.tcp://flink@54.35.41.12
> :41292/user/jobmanager.
> No status updates from the YARN cluster received so far. Waiting ...
>
> The logs of the Jobmanager contains the following -
>
> 21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
> 21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
> 21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
>
> It seems the problem is in the mismatch of the Jobmanager Akka actors
> system running address and the one user by the Client.
> 172.31.23.18 – is the internal private IP of the EC2 machine where the
> Jobmanager container is running.
> 54.35.41.12 – is the external IP of the EC2 machine, used by Flink client
> to submit the Job.
> Because of this mismatch the messages are ignored by the Akka actor System.
>
> Can someone please help me with this issue.
> I can share the detailed logs, if required.
>
> Thanks,
> Abhi
>
>

Re: Submit Flink Jobs to YARN running on AWS

Posted by Ashutosh Kumar <km...@gmail.com>.
If you use open vpn for accessing aws then you can use private IP of ec2
machine from your laptop.


Thanks
Ashutosh

On Tue, Jun 7, 2016 at 11:00 PM, Shannon Carey <sc...@expedia.com> wrote:

> We're also starting to look at automating job deployment/start to Flink
> running on EMR. There are a few options:
>
>    - Use RemoteExecutionEnvironment (per the examples). Problems: not
>    sure best way to upload JAR, not sure how to run it detached so that the
>    Java program that starts the job is asynchronous with the long-running
>    cluster job.
>    - Use the CLI. Problems: need to run it locally on the YARN node,
>    otherwise you encounter the problems discussed below? It requires a Flink
>    distro. Logs of the launch will remain local to the machine that executes
>    it (eg. if it's on a Jenkins slave)
>    - Use the HTTP API
>
> Is using the HTTP API a reasonable approach? Is that API considered stable
> enough that we could rely on it continuing to be present?
>
> Thanks,
> Shannon
>
>
> From: "Bajaj, Abhinav" <ab...@here.com>
> Date: Monday, June 6, 2016 at 12:10 PM
> To: Josh <jo...@gmail.com>
> Cc: "user@flink.apache.org" <us...@flink.apache.org>
>
> Subject: Re: Submit Flink Jobs to YARN running on AWS
>
> Hi Josh,
>
> I have not yet :-( . I am working on getting a REST service setup on AWS
> that can do it rather than using Flink client remotely.
> This way the AKKA communication is within AWS.
>
> However, I still need the solution for running some of the
> integration/system tests.
>
> ~ Abhi
>
> From: Josh <jo...@gmail.com>
> Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
> Date: Monday, June 6, 2016 at 11:55 AM
> To: "user@flink.apache.org" <us...@flink.apache.org>
> Subject: Re: Submit Flink Jobs to YARN running on AWS
>
> Hi Abhi,
>
> I'm also looking to deploy Flink jobs remotely to YARN, and eventually
> automate it - just wondering if you found a way to do it?
>
> Thanks,
> Josh
>
> On Wed, May 25, 2016 at 12:36 AM, Bajaj, Abhinav <ab...@here.com>
> wrote:
>
>> Hi,
>>
>> Has anyone tried to submit a Flink Job remotely to Yarn running in AWS ?
>> The case I am stuck with is where the Flink client is on my laptop and
>> YARN is running on AWS.
>>
>> @Robert, Did you get a chance to try this out?
>>
>> Regards,
>> Abhi
>>
>> From: "Bajaj, Abhinav" <ab...@here.com>
>> Date: Friday, April 29, 2016 at 3:50 PM
>>
>> To: "user@flink.apache.org" <us...@flink.apache.org>
>> Subject: Re: Submit Flink Jobs to YARN running on AWS
>>
>> Hi Robert,
>>
>> Thanks for your reply.
>>
>> I am using the Public DNS for the EC2 machines in the yarn and hdfs
>> configuration files. It looks like "
>> ec2-203-0-113-25.compute-1.amazonaws.com”
>> You should be able to connect then.
>>
>> I have hadoop installed locally and the YARN_CONF_DIR is pointing to it.
>> The yarn-site.xml and core-site.xml files use the resource manager
>> address(Public DNS) running in AWS.
>>
>> So, whenever I submit the job using the client on my laptop, it connects
>> to RM.
>> The RM starts the YARN application and starts the Job manager.
>> The job manager starts the actor system using the internal IP of the
>> nodemanager. In my understanding, this is where the problem lies.
>>
>> The local client tries to connect to the Job manager actor system but the
>> messages are dropped by the actor system as the IP address(EC2 internal IP)
>> that actor system started with does not match the external IP
>> address(Public IP) that was used by Flink client to send the message.
>> Please see my first mail below for detailed logs.
>>
>> Please keep me posted with your progress.
>>
>> I plan to move the cluster to VPC for other reasons.
>> I have limited knowledge of VPC but I guess the difference in internal
>> and external IP address will not be resolved.
>> Please correct if your views are different.
>>
>> It will be great if you are able to reproduce the issue.
>>
>> Thanks again.
>> Abhi
>>
>>
>>
>> *[image: cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]*
>>
>> *Abhinav Bajaj*
>>
>> Senior Engineer
>>
>> HERE Predictive Analytics
>>
>> Office:  +12062092767
>>
>> Mobile: +17083299516
>>
>> *HERE Seattle*
>>
>> 701 Pike Street, #2000, Seattle, WA 98101, USA
>>
>> *47° 36' 41" N. 122° 19' 57" W*
>>
>> *HERE Maps*
>>
>>
>>
>>
>> From: Robert Metzger <rm...@apache.org>
>> Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
>> Date: Tuesday, April 26, 2016 at 3:16 AM
>> To: "user@flink.apache.org" <us...@flink.apache.org>
>> Subject: Re: Submit Flink Jobs to YARN running on AWS
>>
>> I've started my own EMR cluster and tried to launch a Flink job from my
>> local machine on it.
>> I have to admin that configuring the EMR launched Hadoop for external
>> access is quite a hassle.
>>
>> I'm not even able to submit Flink to the YARN cluster because the client
>> can not connect to the ResourceManager. I've change the resource manager
>> hostname to the public one in the yarn-site.xml on the cluster and
>> restarted it, but the client still can not connect.
>> It seems that the RM address is being overwritten by the Hadoop code?
>> [image: Inline image 1]
>>
>> How did you manage to get this working?
>>
>> In the VM settings, I disabled the "Source/Dest checks", but I don't
>> think this is related.
>>
>> Have you considered using Amazon's VPN service, I guess then you would
>> have "local" access to the cluster?
>>
>> On YARN, Flink is not using the flink-conf.yaml setting for the
>> jobmanager's hostname. Its using YARN's "yarn.nodemanager.hostname" from
>> the yarn-site.xml.
>> I haven't tried it, but it could work if you set the public hostname of
>> each NodeManager in the yarn-site.xml.
>>
>> Also, maybe the product forum / customer support of Amazon can help you
>> here. Other systems like Spark or Storm have very similar architectures and
>> will face the same issues. I guess they have some recipes for such
>> situations.
>>
>> Regards,
>> Robert
>>
>>
>>
>>
>> On Tue, Apr 26, 2016 at 10:47 AM, Robert Metzger <rm...@apache.org>
>> wrote:
>>
>>> Hi Abhi,
>>>
>>> I'll try to reproduce the issue and come up with a solution.
>>>
>>> On Tue, Apr 26, 2016 at 1:13 AM, Bajaj, Abhinav <ab...@here.com>
>>> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thanks for your reply and the pointers to documentation.
>>>>
>>>> In these steps, I think the Flink client is installed on the master
>>>> node, referring to steps mentioned in Flink docs here
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html>
>>>> .
>>>> However, the scenario I have is to run the client on my local machine
>>>> and submit jobs remotely to the YARN Cluster (running on EMR or
>>>> independently).
>>>>
>>>> Let me describe in more detail here.
>>>> I am trying to submit a single Flink Job to YARN using the client,
>>>> running on my dev machine -
>>>>
>>>> ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096
>>>>  ./examples/batch/WordCount.jar
>>>>
>>>> In my understanding, YARN (running in AWS) allocates a container for
>>>> the Jobmanager.
>>>> Jobmanager discovers the IP and started the Actor system. At this step
>>>> the IP it uses is the internal IP address, of the EC2 instance.
>>>>
>>>> The client, running on my dev machine, is not able to connect to the
>>>> Jobmanager for reasons explained in my mail below.
>>>>
>>>> Is there a way, where I can set Jobmanager to use the hostname and not
>>>> the IP address?
>>>>
>>>> Or any other suggestions?
>>>>
>>>> Thanks,
>>>> Abhi
>>>>
>>>> *[image: cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]*
>>>>
>>>> *Abhinav Bajaj*
>>>>
>>>> Senior Engineer
>>>>
>>>> HERE Predictive Analytics
>>>>
>>>> Office:  +12062092767
>>>>
>>>> Mobile: +17083299516
>>>>
>>>> *HERE Seattle*
>>>>
>>>> 701 Pike Street, #2000, Seattle, WA 98101, USA
>>>>
>>>> *47° 36' 41" N. 122° 19' 57" W*
>>>>
>>>> *HERE Maps*
>>>>
>>>>
>>>>
>>>>
>>>> From: Fabian Hueske <fh...@gmail.com>
>>>> Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
>>>> Date: Wednesday, March 9, 2016 at 12:51 AM
>>>> To: "user@flink.apache.org" <us...@flink.apache.org>
>>>> Subject: Re: Submit Flink Jobs to YARN running on AWS
>>>>
>>>> Hi Abhi,
>>>>
>>>> I have used Flink on EMR via YARN a couple of times without problems.
>>>> I started a Flink YARN session like this:
>>>>
>>>> ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
>>>>
>>>> This will start five YARN containers (1 JobManager with 1024MB, 4
>>>> Taskmanagers with 4096MB). See more config options in the documentation [1].
>>>> In one of the last lines of the std-out output you should find a line
>>>> that tells you the IP and port of the JobManager.
>>>>
>>>> With the IP and port, you can submit a job as follows:
>>>>
>>>> ./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar <arguments>
>>>>
>>>> This will send the job to the JobManager specified by IP and port and
>>>> execute the program with a parallelism of 4. See more config options in the
>>>> documentation [2].
>>>>
>>>> If this does not help, could you share the exact command that you use
>>>> to start the YARN session and submit the job?
>>>>
>>>> Best, Fabian
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html
>>>>
>>>> 2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav <ab...@here.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am a newbie to Flink and trying to use it in AWS.
>>>>> I have created a YARN cluster on AWS EC2 machines.
>>>>> Trying to submit Flink job to the remote YARN cluster using the Flink
>>>>> Client running on my local machine.
>>>>>
>>>>> The Jobmanager start successfully on the YARN container but the client
>>>>> is not able to connect to the Jobmanager.
>>>>>
>>>>> Flink Client Logs -
>>>>>
>>>>> 13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>>>           - Deploying cluster, current state ACCEPTED
>>>>> 13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>>>           - Deploying cluster, current state ACCEPTED
>>>>> 13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>>>           - YARN application has been deployed successfully.
>>>>> 13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>>>            - Start actor system.
>>>>> 13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>>>            - Start application client.
>>>>> YARN cluster started
>>>>> JobManager web interface address
>>>>> http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
>>>>> Waiting until all TaskManagers have connected
>>>>> 13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient
>>>>>           - Notification about new leader address akka.tcp:
>>>>> //flink@54.35.41.12:41292/user/jobmanager with session ID null.
>>>>> No status updates from the YARN cluster received so far. Waiting ...
>>>>> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>>>>>           - Received address of new leader akka.tcp:
>>>>> //flink@54.35.41.12:41292/user/jobmanager with session ID null.
>>>>> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>>>>>           - Disconnect from JobManager null.
>>>>> 13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient
>>>>>           - Trying to register at JobManager akka.tcp:
>>>>> //flink@54.35.41.12:41292/user/jobmanager.
>>>>> No status updates from the YARN cluster received so far. Waiting ...
>>>>>
>>>>> The logs of the Jobmanager contains the following -
>>>>>
>>>>> 21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
>>>>> 21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
>>>>> 21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
>>>>>
>>>>> It seems the problem is in the mismatch of the Jobmanager Akka actors
>>>>> system running address and the one user by the Client.
>>>>> 172.31.23.18 – is the internal private IP of the EC2 machine where the
>>>>> Jobmanager container is running.
>>>>> 54.35.41.12 – is the external IP of the EC2 machine, used by Flink
>>>>> client to submit the Job.
>>>>> Because of this mismatch the messages are ignored by the Akka actor
>>>>> System.
>>>>>
>>>>> Can someone please help me with this issue.
>>>>> I can share the detailed logs, if required.
>>>>>
>>>>> Thanks,
>>>>> Abhi
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Submit Flink Jobs to YARN running on AWS

Posted by Shannon Carey <sc...@expedia.com>.
We're also starting to look at automating job deployment/start to Flink running on EMR. There are a few options:

  *   Use RemoteExecutionEnvironment (per the examples). Problems: not sure best way to upload JAR, not sure how to run it detached so that the Java program that starts the job is asynchronous with the long-running cluster job.
  *   Use the CLI. Problems: need to run it locally on the YARN node, otherwise you encounter the problems discussed below? It requires a Flink distro. Logs of the launch will remain local to the machine that executes it (eg. if it's on a Jenkins slave)
  *   Use the HTTP API

Is using the HTTP API a reasonable approach? Is that API considered stable enough that we could rely on it continuing to be present?

Thanks,
Shannon


From: "Bajaj, Abhinav" <ab...@here.com>>
Date: Monday, June 6, 2016 at 12:10 PM
To: Josh <jo...@gmail.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Josh,

I have not yet :-( . I am working on getting a REST service setup on AWS that can do it rather than using Flink client remotely.
This way the AKKA communication is within AWS.

However, I still need the solution for running some of the integration/system tests.

~ Abhi

From: Josh <jo...@gmail.com>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Monday, June 6, 2016 at 11:55 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Abhi,

I'm also looking to deploy Flink jobs remotely to YARN, and eventually automate it - just wondering if you found a way to do it?

Thanks,
Josh

On Wed, May 25, 2016 at 12:36 AM, Bajaj, Abhinav <ab...@here.com>> wrote:
Hi,

Has anyone tried to submit a Flink Job remotely to Yarn running in AWS ?
The case I am stuck with is where the Flink client is on my laptop and YARN is running on AWS.

@Robert, Did you get a chance to try this out?

Regards,
Abhi

From: "Bajaj, Abhinav" <ab...@here.com>>
Date: Friday, April 29, 2016 at 3:50 PM

To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Robert,

Thanks for your reply.

I am using the Public DNS for the EC2 machines in the yarn and hdfs configuration files. It looks like "ec2-203-0-113-25.compute-1.amazonaws.com<http://ec2-203-0-113-25.compute-1.amazonaws.com>”
You should be able to connect then.

I have hadoop installed locally and the YARN_CONF_DIR is pointing to it.
The yarn-site.xml and core-site.xml files use the resource manager address(Public DNS) running in AWS.

So, whenever I submit the job using the client on my laptop, it connects to RM.
The RM starts the YARN application and starts the Job manager.
The job manager starts the actor system using the internal IP of the nodemanager. In my understanding, this is where the problem lies.

The local client tries to connect to the Job manager actor system but the messages are dropped by the actor system as the IP address(EC2 internal IP) that actor system started with does not match the external IP address(Public IP) that was used by Flink client to send the message. Please see my first mail below for detailed logs.

Please keep me posted with your progress.

I plan to move the cluster to VPC for other reasons.
I have limited knowledge of VPC but I guess the difference in internal and external IP address will not be resolved.
Please correct if your views are different.

It will be great if you are able to reproduce the issue.

Thanks again.
Abhi



[cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]

Abhinav Bajaj
Senior Engineer
HERE Predictive Analytics
Office:  +12062092767<tel:%2B12062092767>
Mobile: +17083299516<tel:%2B17083299516>

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Robert Metzger <rm...@apache.org>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Tuesday, April 26, 2016 at 3:16 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

I've started my own EMR cluster and tried to launch a Flink job from my local machine on it.
I have to admin that configuring the EMR launched Hadoop for external access is quite a hassle.

I'm not even able to submit Flink to the YARN cluster because the client can not connect to the ResourceManager. I've change the resource manager hostname to the public one in the yarn-site.xml on the cluster and restarted it, but the client still can not connect.
It seems that the RM address is being overwritten by the Hadoop code?
[Inline image 1]

How did you manage to get this working?

In the VM settings, I disabled the "Source/Dest checks", but I don't think this is related.

Have you considered using Amazon's VPN service, I guess then you would have "local" access to the cluster?

On YARN, Flink is not using the flink-conf.yaml setting for the jobmanager's hostname. Its using YARN's "yarn.nodemanager.hostname" from the yarn-site.xml.
I haven't tried it, but it could work if you set the public hostname of each NodeManager in the yarn-site.xml.

Also, maybe the product forum / customer support of Amazon can help you here. Other systems like Spark or Storm have very similar architectures and will face the same issues. I guess they have some recipes for such situations.

Regards,
Robert




On Tue, Apr 26, 2016 at 10:47 AM, Robert Metzger <rm...@apache.org>> wrote:
Hi Abhi,

I'll try to reproduce the issue and come up with a solution.

On Tue, Apr 26, 2016 at 1:13 AM, Bajaj, Abhinav <ab...@here.com>> wrote:
Hi Fabian,

Thanks for your reply and the pointers to documentation.

In these steps, I think the Flink client is installed on the master node, referring to steps mentioned in Flink docs here<https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html>.
However, the scenario I have is to run the client on my local machine and submit jobs remotely to the YARN Cluster (running on EMR or independently).

Let me describe in more detail here.
I am trying to submit a single Flink Job to YARN using the client, running on my dev machine -

./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

In my understanding, YARN (running in AWS) allocates a container for the Jobmanager.
Jobmanager discovers the IP and started the Actor system. At this step the IP it uses is the internal IP address, of the EC2 instance.

The client, running on my dev machine, is not able to connect to the Jobmanager for reasons explained in my mail below.

Is there a way, where I can set Jobmanager to use the hostname and not the IP address?

Or any other suggestions?

Thanks,
Abhi

[cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]

Abhinav Bajaj
Senior Engineer
HERE Predictive Analytics
Office:  +12062092767<tel:%2B12062092767>
Mobile: +17083299516<tel:%2B17083299516>

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Fabian Hueske <fh...@gmail.com>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Wednesday, March 9, 2016 at 12:51 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Abhi,

I have used Flink on EMR via YARN a couple of times without problems.
I started a Flink YARN session like this:

./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

This will start five YARN containers (1 JobManager with 1024MB, 4 Taskmanagers with 4096MB). See more config options in the documentation [1].
In one of the last lines of the std-out output you should find a line that tells you the IP and port of the JobManager.

With the IP and port, you can submit a job as follows:

./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar <arguments>

This will send the job to the JobManager specified by IP and port and execute the program with a parallelism of 4. See more config options in the documentation [2].

If this does not help, could you share the exact command that you use to start the YARN session and submit the job?

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html

2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav <ab...@here.com>>:
Hi,

I am a newbie to Flink and trying to use it in AWS.
I have created a YARN cluster on AWS EC2 machines.
Trying to submit Flink job to the remote YARN cluster using the Flink Client running on my local machine.

The Jobmanager start successfully on the YARN container but the client is not able to connect to the Jobmanager.

Flink Client Logs -

13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient                         - YARN application has been deployed successfully.
13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start actor system.
13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start application client.
YARN cluster started
JobManager web interface address http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
Waiting until all TaskManagers have connected
13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
No status updates from the YARN cluster received so far. Waiting ...
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager.
No status updates from the YARN cluster received so far. Waiting ...

The logs of the Jobmanager contains the following -

21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]
21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]

It seems the problem is in the mismatch of the Jobmanager Akka actors system running address and the one user by the Client.
172.31.23.18 – is the internal private IP of the EC2 machine where the Jobmanager container is running.
54.35.41.12 – is the external IP of the EC2 machine, used by Flink client to submit the Job.
Because of this mismatch the messages are ignored by the Akka actor System.

Can someone please help me with this issue.
I can share the detailed logs, if required.

Thanks,
Abhi






Re: Submit Flink Jobs to YARN running on AWS

Posted by "Bajaj, Abhinav" <ab...@here.com>.
Hi Josh,

I have not yet :-( . I am working on getting a REST service setup on AWS that can do it rather than using Flink client remotely.
This way the AKKA communication is within AWS.

However, I still need the solution for running some of the integration/system tests.

~ Abhi

From: Josh <jo...@gmail.com>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Monday, June 6, 2016 at 11:55 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Abhi,

I'm also looking to deploy Flink jobs remotely to YARN, and eventually automate it - just wondering if you found a way to do it?

Thanks,
Josh

On Wed, May 25, 2016 at 12:36 AM, Bajaj, Abhinav <ab...@here.com>> wrote:
Hi,

Has anyone tried to submit a Flink Job remotely to Yarn running in AWS ?
The case I am stuck with is where the Flink client is on my laptop and YARN is running on AWS.

@Robert, Did you get a chance to try this out?

Regards,
Abhi

From: "Bajaj, Abhinav" <ab...@here.com>>
Date: Friday, April 29, 2016 at 3:50 PM

To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Robert,

Thanks for your reply.

I am using the Public DNS for the EC2 machines in the yarn and hdfs configuration files. It looks like "ec2-203-0-113-25.compute-1.amazonaws.com<http://ec2-203-0-113-25.compute-1.amazonaws.com>”
You should be able to connect then.

I have hadoop installed locally and the YARN_CONF_DIR is pointing to it.
The yarn-site.xml and core-site.xml files use the resource manager address(Public DNS) running in AWS.

So, whenever I submit the job using the client on my laptop, it connects to RM.
The RM starts the YARN application and starts the Job manager.
The job manager starts the actor system using the internal IP of the nodemanager. In my understanding, this is where the problem lies.

The local client tries to connect to the Job manager actor system but the messages are dropped by the actor system as the IP address(EC2 internal IP) that actor system started with does not match the external IP address(Public IP) that was used by Flink client to send the message. Please see my first mail below for detailed logs.

Please keep me posted with your progress.

I plan to move the cluster to VPC for other reasons.
I have limited knowledge of VPC but I guess the difference in internal and external IP address will not be resolved.
Please correct if your views are different.

It will be great if you are able to reproduce the issue.

Thanks again.
Abhi



[cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]

Abhinav Bajaj
Senior Engineer
HERE Predictive Analytics
Office:  +12062092767<tel:%2B12062092767>
Mobile: +17083299516<tel:%2B17083299516>

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Robert Metzger <rm...@apache.org>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Tuesday, April 26, 2016 at 3:16 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

I've started my own EMR cluster and tried to launch a Flink job from my local machine on it.
I have to admin that configuring the EMR launched Hadoop for external access is quite a hassle.

I'm not even able to submit Flink to the YARN cluster because the client can not connect to the ResourceManager. I've change the resource manager hostname to the public one in the yarn-site.xml on the cluster and restarted it, but the client still can not connect.
It seems that the RM address is being overwritten by the Hadoop code?
[Inline image 1]

How did you manage to get this working?

In the VM settings, I disabled the "Source/Dest checks", but I don't think this is related.

Have you considered using Amazon's VPN service, I guess then you would have "local" access to the cluster?

On YARN, Flink is not using the flink-conf.yaml setting for the jobmanager's hostname. Its using YARN's "yarn.nodemanager.hostname" from the yarn-site.xml.
I haven't tried it, but it could work if you set the public hostname of each NodeManager in the yarn-site.xml.

Also, maybe the product forum / customer support of Amazon can help you here. Other systems like Spark or Storm have very similar architectures and will face the same issues. I guess they have some recipes for such situations.

Regards,
Robert




On Tue, Apr 26, 2016 at 10:47 AM, Robert Metzger <rm...@apache.org>> wrote:
Hi Abhi,

I'll try to reproduce the issue and come up with a solution.

On Tue, Apr 26, 2016 at 1:13 AM, Bajaj, Abhinav <ab...@here.com>> wrote:
Hi Fabian,

Thanks for your reply and the pointers to documentation.

In these steps, I think the Flink client is installed on the master node, referring to steps mentioned in Flink docs here<https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html>.
However, the scenario I have is to run the client on my local machine and submit jobs remotely to the YARN Cluster (running on EMR or independently).

Let me describe in more detail here.
I am trying to submit a single Flink Job to YARN using the client, running on my dev machine -

./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

In my understanding, YARN (running in AWS) allocates a container for the Jobmanager.
Jobmanager discovers the IP and started the Actor system. At this step the IP it uses is the internal IP address, of the EC2 instance.

The client, running on my dev machine, is not able to connect to the Jobmanager for reasons explained in my mail below.

Is there a way, where I can set Jobmanager to use the hostname and not the IP address?

Or any other suggestions?

Thanks,
Abhi

[cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]

Abhinav Bajaj
Senior Engineer
HERE Predictive Analytics
Office:  +12062092767<tel:%2B12062092767>
Mobile: +17083299516<tel:%2B17083299516>

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Fabian Hueske <fh...@gmail.com>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Wednesday, March 9, 2016 at 12:51 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Abhi,

I have used Flink on EMR via YARN a couple of times without problems.
I started a Flink YARN session like this:

./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

This will start five YARN containers (1 JobManager with 1024MB, 4 Taskmanagers with 4096MB). See more config options in the documentation [1].
In one of the last lines of the std-out output you should find a line that tells you the IP and port of the JobManager.

With the IP and port, you can submit a job as follows:

./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar <arguments>

This will send the job to the JobManager specified by IP and port and execute the program with a parallelism of 4. See more config options in the documentation [2].

If this does not help, could you share the exact command that you use to start the YARN session and submit the job?

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html

2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav <ab...@here.com>>:
Hi,

I am a newbie to Flink and trying to use it in AWS.
I have created a YARN cluster on AWS EC2 machines.
Trying to submit Flink job to the remote YARN cluster using the Flink Client running on my local machine.

The Jobmanager start successfully on the YARN container but the client is not able to connect to the Jobmanager.

Flink Client Logs -

13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient                         - YARN application has been deployed successfully.
13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start actor system.
13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start application client.
YARN cluster started
JobManager web interface address http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
Waiting until all TaskManagers have connected
13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
No status updates from the YARN cluster received so far. Waiting ...
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager.
No status updates from the YARN cluster received so far. Waiting ...

The logs of the Jobmanager contains the following -

21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]
21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]

It seems the problem is in the mismatch of the Jobmanager Akka actors system running address and the one user by the Client.
172.31.23.18 – is the internal private IP of the EC2 machine where the Jobmanager container is running.
54.35.41.12 – is the external IP of the EC2 machine, used by Flink client to submit the Job.
Because of this mismatch the messages are ignored by the Akka actor System.

Can someone please help me with this issue.
I can share the detailed logs, if required.

Thanks,
Abhi






Re: Submit Flink Jobs to YARN running on AWS

Posted by Josh <jo...@gmail.com>.
Hi Abhi,

I'm also looking to deploy Flink jobs remotely to YARN, and eventually
automate it - just wondering if you found a way to do it?

Thanks,
Josh

On Wed, May 25, 2016 at 12:36 AM, Bajaj, Abhinav <ab...@here.com>
wrote:

> Hi,
>
> Has anyone tried to submit a Flink Job remotely to Yarn running in AWS ?
> The case I am stuck with is where the Flink client is on my laptop and
> YARN is running on AWS.
>
> @Robert, Did you get a chance to try this out?
>
> Regards,
> Abhi
>
> From: "Bajaj, Abhinav" <ab...@here.com>
> Date: Friday, April 29, 2016 at 3:50 PM
>
> To: "user@flink.apache.org" <us...@flink.apache.org>
> Subject: Re: Submit Flink Jobs to YARN running on AWS
>
> Hi Robert,
>
> Thanks for your reply.
>
> I am using the Public DNS for the EC2 machines in the yarn and hdfs
> configuration files. It looks like "
> ec2-203-0-113-25.compute-1.amazonaws.com”
> You should be able to connect then.
>
> I have hadoop installed locally and the YARN_CONF_DIR is pointing to it.
> The yarn-site.xml and core-site.xml files use the resource manager
> address(Public DNS) running in AWS.
>
> So, whenever I submit the job using the client on my laptop, it connects
> to RM.
> The RM starts the YARN application and starts the Job manager.
> The job manager starts the actor system using the internal IP of the
> nodemanager. In my understanding, this is where the problem lies.
>
> The local client tries to connect to the Job manager actor system but the
> messages are dropped by the actor system as the IP address(EC2 internal IP)
> that actor system started with does not match the external IP
> address(Public IP) that was used by Flink client to send the message.
> Please see my first mail below for detailed logs.
>
> Please keep me posted with your progress.
>
> I plan to move the cluster to VPC for other reasons.
> I have limited knowledge of VPC but I guess the difference in internal and
> external IP address will not be resolved.
> Please correct if your views are different.
>
> It will be great if you are able to reproduce the issue.
>
> Thanks again.
> Abhi
>
>
>
> *[image: cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]*
>
> *Abhinav Bajaj*
>
> Senior Engineer
>
> HERE Predictive Analytics
>
> Office:  +12062092767
>
> Mobile: +17083299516
>
> *HERE Seattle*
>
> 701 Pike Street, #2000, Seattle, WA 98101, USA
>
> *47° 36' 41" N. 122° 19' 57" W*
>
> *HERE Maps*
>
>
>
>
> From: Robert Metzger <rm...@apache.org>
> Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
> Date: Tuesday, April 26, 2016 at 3:16 AM
> To: "user@flink.apache.org" <us...@flink.apache.org>
> Subject: Re: Submit Flink Jobs to YARN running on AWS
>
> I've started my own EMR cluster and tried to launch a Flink job from my
> local machine on it.
> I have to admin that configuring the EMR launched Hadoop for external
> access is quite a hassle.
>
> I'm not even able to submit Flink to the YARN cluster because the client
> can not connect to the ResourceManager. I've change the resource manager
> hostname to the public one in the yarn-site.xml on the cluster and
> restarted it, but the client still can not connect.
> It seems that the RM address is being overwritten by the Hadoop code?
> [image: Inline image 1]
>
> How did you manage to get this working?
>
> In the VM settings, I disabled the "Source/Dest checks", but I don't think
> this is related.
>
> Have you considered using Amazon's VPN service, I guess then you would
> have "local" access to the cluster?
>
> On YARN, Flink is not using the flink-conf.yaml setting for the
> jobmanager's hostname. Its using YARN's "yarn.nodemanager.hostname" from
> the yarn-site.xml.
> I haven't tried it, but it could work if you set the public hostname of
> each NodeManager in the yarn-site.xml.
>
> Also, maybe the product forum / customer support of Amazon can help you
> here. Other systems like Spark or Storm have very similar architectures and
> will face the same issues. I guess they have some recipes for such
> situations.
>
> Regards,
> Robert
>
>
>
>
> On Tue, Apr 26, 2016 at 10:47 AM, Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hi Abhi,
>>
>> I'll try to reproduce the issue and come up with a solution.
>>
>> On Tue, Apr 26, 2016 at 1:13 AM, Bajaj, Abhinav <ab...@here.com>
>> wrote:
>>
>>> Hi Fabian,
>>>
>>> Thanks for your reply and the pointers to documentation.
>>>
>>> In these steps, I think the Flink client is installed on the master
>>> node, referring to steps mentioned in Flink docs here
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html>
>>> .
>>> However, the scenario I have is to run the client on my local machine
>>> and submit jobs remotely to the YARN Cluster (running on EMR or
>>> independently).
>>>
>>> Let me describe in more detail here.
>>> I am trying to submit a single Flink Job to YARN using the client,
>>> running on my dev machine -
>>>
>>> ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096
>>>  ./examples/batch/WordCount.jar
>>>
>>> In my understanding, YARN (running in AWS) allocates a container for the
>>> Jobmanager.
>>> Jobmanager discovers the IP and started the Actor system. At this step
>>> the IP it uses is the internal IP address, of the EC2 instance.
>>>
>>> The client, running on my dev machine, is not able to connect to the
>>> Jobmanager for reasons explained in my mail below.
>>>
>>> Is there a way, where I can set Jobmanager to use the hostname and not
>>> the IP address?
>>>
>>> Or any other suggestions?
>>>
>>> Thanks,
>>> Abhi
>>>
>>> *[image: cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]*
>>>
>>> *Abhinav Bajaj*
>>>
>>> Senior Engineer
>>>
>>> HERE Predictive Analytics
>>>
>>> Office:  +12062092767
>>>
>>> Mobile: +17083299516
>>>
>>> *HERE Seattle*
>>>
>>> 701 Pike Street, #2000, Seattle, WA 98101, USA
>>>
>>> *47° 36' 41" N. 122° 19' 57" W*
>>>
>>> *HERE Maps*
>>>
>>>
>>>
>>>
>>> From: Fabian Hueske <fh...@gmail.com>
>>> Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
>>> Date: Wednesday, March 9, 2016 at 12:51 AM
>>> To: "user@flink.apache.org" <us...@flink.apache.org>
>>> Subject: Re: Submit Flink Jobs to YARN running on AWS
>>>
>>> Hi Abhi,
>>>
>>> I have used Flink on EMR via YARN a couple of times without problems.
>>> I started a Flink YARN session like this:
>>>
>>> ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
>>>
>>> This will start five YARN containers (1 JobManager with 1024MB, 4
>>> Taskmanagers with 4096MB). See more config options in the documentation [1].
>>> In one of the last lines of the std-out output you should find a line
>>> that tells you the IP and port of the JobManager.
>>>
>>> With the IP and port, you can submit a job as follows:
>>>
>>> ./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar <arguments>
>>>
>>> This will send the job to the JobManager specified by IP and port and
>>> execute the program with a parallelism of 4. See more config options in the
>>> documentation [2].
>>>
>>> If this does not help, could you share the exact command that you use to
>>> start the YARN session and submit the job?
>>>
>>> Best, Fabian
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html
>>>
>>> 2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav <ab...@here.com>:
>>>
>>>> Hi,
>>>>
>>>> I am a newbie to Flink and trying to use it in AWS.
>>>> I have created a YARN cluster on AWS EC2 machines.
>>>> Trying to submit Flink job to the remote YARN cluster using the Flink
>>>> Client running on my local machine.
>>>>
>>>> The Jobmanager start successfully on the YARN container but the client
>>>> is not able to connect to the Jobmanager.
>>>>
>>>> Flink Client Logs -
>>>>
>>>> 13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>>           - Deploying cluster, current state ACCEPTED
>>>> 13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>>           - Deploying cluster, current state ACCEPTED
>>>> 13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>>           - YARN application has been deployed successfully.
>>>> 13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>>          - Start actor system.
>>>> 13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>>          - Start application client.
>>>> YARN cluster started
>>>> JobManager web interface address
>>>> http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
>>>> Waiting until all TaskManagers have connected
>>>> 13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient
>>>>           - Notification about new leader address akka.tcp:
>>>> //flink@54.35.41.12:41292/user/jobmanager with session ID null.
>>>> No status updates from the YARN cluster received so far. Waiting ...
>>>> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>>>>           - Received address of new leader akka.tcp://flink@54.35.41.12:41292/user/jobmanager
>>>> with session ID null.
>>>> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>>>>           - Disconnect from JobManager null.
>>>> 13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient
>>>>           - Trying to register at JobManager akka.tcp:
>>>> //flink@54.35.41.12:41292/user/jobmanager.
>>>> No status updates from the YARN cluster received so far. Waiting ...
>>>>
>>>> The logs of the Jobmanager contains the following -
>>>>
>>>> 21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
>>>> 21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
>>>> 21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
>>>>
>>>> It seems the problem is in the mismatch of the Jobmanager Akka actors
>>>> system running address and the one user by the Client.
>>>> 172.31.23.18 – is the internal private IP of the EC2 machine where the
>>>> Jobmanager container is running.
>>>> 54.35.41.12 – is the external IP of the EC2 machine, used by Flink
>>>> client to submit the Job.
>>>> Because of this mismatch the messages are ignored by the Akka actor
>>>> System.
>>>>
>>>> Can someone please help me with this issue.
>>>> I can share the detailed logs, if required.
>>>>
>>>> Thanks,
>>>> Abhi
>>>>
>>>>
>>>
>>
>

Re: Submit Flink Jobs to YARN running on AWS

Posted by "Bajaj, Abhinav" <ab...@here.com>.
Hi,

Has anyone tried to submit a Flink Job remotely to Yarn running in AWS ?
The case I am stuck with is where the Flink client is on my laptop and YARN is running on AWS.

@Robert, Did you get a chance to try this out?

Regards,
Abhi

From: "Bajaj, Abhinav" <ab...@here.com>>
Date: Friday, April 29, 2016 at 3:50 PM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Robert,

Thanks for your reply.

I am using the Public DNS for the EC2 machines in the yarn and hdfs configuration files. It looks like "ec2-203-0-113-25.compute-1.amazonaws.com”
You should be able to connect then.

I have hadoop installed locally and the YARN_CONF_DIR is pointing to it.
The yarn-site.xml and core-site.xml files use the resource manager address(Public DNS) running in AWS.

So, whenever I submit the job using the client on my laptop, it connects to RM.
The RM starts the YARN application and starts the Job manager.
The job manager starts the actor system using the internal IP of the nodemanager. In my understanding, this is where the problem lies.

The local client tries to connect to the Job manager actor system but the messages are dropped by the actor system as the IP address(EC2 internal IP) that actor system started with does not match the external IP address(Public IP) that was used by Flink client to send the message. Please see my first mail below for detailed logs.

Please keep me posted with your progress.

I plan to move the cluster to VPC for other reasons.
I have limited knowledge of VPC but I guess the difference in internal and external IP address will not be resolved.
Please correct if your views are different.

It will be great if you are able to reproduce the issue.

Thanks again.
Abhi



[cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]

Abhinav Bajaj
Senior Engineer
HERE Predictive Analytics
Office:  +12062092767
Mobile: +17083299516

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Robert Metzger <rm...@apache.org>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Tuesday, April 26, 2016 at 3:16 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

I've started my own EMR cluster and tried to launch a Flink job from my local machine on it.
I have to admin that configuring the EMR launched Hadoop for external access is quite a hassle.

I'm not even able to submit Flink to the YARN cluster because the client can not connect to the ResourceManager. I've change the resource manager hostname to the public one in the yarn-site.xml on the cluster and restarted it, but the client still can not connect.
It seems that the RM address is being overwritten by the Hadoop code?
[Inline image 1]

How did you manage to get this working?

In the VM settings, I disabled the "Source/Dest checks", but I don't think this is related.

Have you considered using Amazon's VPN service, I guess then you would have "local" access to the cluster?

On YARN, Flink is not using the flink-conf.yaml setting for the jobmanager's hostname. Its using YARN's "yarn.nodemanager.hostname" from the yarn-site.xml.
I haven't tried it, but it could work if you set the public hostname of each NodeManager in the yarn-site.xml.

Also, maybe the product forum / customer support of Amazon can help you here. Other systems like Spark or Storm have very similar architectures and will face the same issues. I guess they have some recipes for such situations.

Regards,
Robert




On Tue, Apr 26, 2016 at 10:47 AM, Robert Metzger <rm...@apache.org>> wrote:
Hi Abhi,

I'll try to reproduce the issue and come up with a solution.

On Tue, Apr 26, 2016 at 1:13 AM, Bajaj, Abhinav <ab...@here.com>> wrote:
Hi Fabian,

Thanks for your reply and the pointers to documentation.

In these steps, I think the Flink client is installed on the master node, referring to steps mentioned in Flink docs here<https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html>.
However, the scenario I have is to run the client on my local machine and submit jobs remotely to the YARN Cluster (running on EMR or independently).

Let me describe in more detail here.
I am trying to submit a single Flink Job to YARN using the client, running on my dev machine -

./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

In my understanding, YARN (running in AWS) allocates a container for the Jobmanager.
Jobmanager discovers the IP and started the Actor system. At this step the IP it uses is the internal IP address, of the EC2 instance.

The client, running on my dev machine, is not able to connect to the Jobmanager for reasons explained in my mail below.

Is there a way, where I can set Jobmanager to use the hostname and not the IP address?

Or any other suggestions?

Thanks,
Abhi

[cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]

Abhinav Bajaj
Senior Engineer
HERE Predictive Analytics
Office:  +12062092767<tel:%2B12062092767>
Mobile: +17083299516<tel:%2B17083299516>

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Fabian Hueske <fh...@gmail.com>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Wednesday, March 9, 2016 at 12:51 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Abhi,

I have used Flink on EMR via YARN a couple of times without problems.
I started a Flink YARN session like this:

./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

This will start five YARN containers (1 JobManager with 1024MB, 4 Taskmanagers with 4096MB). See more config options in the documentation [1].
In one of the last lines of the std-out output you should find a line that tells you the IP and port of the JobManager.

With the IP and port, you can submit a job as follows:

./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar <arguments>

This will send the job to the JobManager specified by IP and port and execute the program with a parallelism of 4. See more config options in the documentation [2].

If this does not help, could you share the exact command that you use to start the YARN session and submit the job?

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html

2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav <ab...@here.com>>:
Hi,

I am a newbie to Flink and trying to use it in AWS.
I have created a YARN cluster on AWS EC2 machines.
Trying to submit Flink job to the remote YARN cluster using the Flink Client running on my local machine.

The Jobmanager start successfully on the YARN container but the client is not able to connect to the Jobmanager.

Flink Client Logs -

13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient                         - YARN application has been deployed successfully.
13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start actor system.
13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start application client.
YARN cluster started
JobManager web interface address http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
Waiting until all TaskManagers have connected
13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
No status updates from the YARN cluster received so far. Waiting ...
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager.
No status updates from the YARN cluster received so far. Waiting ...

The logs of the Jobmanager contains the following -

21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]
21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]

It seems the problem is in the mismatch of the Jobmanager Akka actors system running address and the one user by the Client.
172.31.23.18 – is the internal private IP of the EC2 machine where the Jobmanager container is running.
54.35.41.12 – is the external IP of the EC2 machine, used by Flink client to submit the Job.
Because of this mismatch the messages are ignored by the Akka actor System.

Can someone please help me with this issue.
I can share the detailed logs, if required.

Thanks,
Abhi





Re: Submit Flink Jobs to YARN running on AWS

Posted by "Bajaj, Abhinav" <ab...@here.com>.
Hi Robert,

Thanks for your reply.

I am using the Public DNS for the EC2 machines in the yarn and hdfs configuration files. It looks like "ec2-203-0-113-25.compute-1.amazonaws.com”
You should be able to connect then.

I have hadoop installed locally and the YARN_CONF_DIR is pointing to it.
The yarn-site.xml and core-site.xml files use the resource manager address(Public DNS) running in AWS.

So, whenever I submit the job using the client on my laptop, it connects to RM.
The RM starts the YARN application and starts the Job manager.
The job manager starts the actor system using the internal IP of the nodemanager. In my understanding, this is where the problem lies.

The local client tries to connect to the Job manager actor system but the messages are dropped by the actor system as the IP address(EC2 internal IP) that actor system started with does not match the external IP address(Public IP) that was used by Flink client to send the message. Please see my first mail below for detailed logs.

Please keep me posted with your progress.

I plan to move the cluster to VPC for other reasons.
I have limited knowledge of VPC but I guess the difference in internal and external IP address will not be resolved.
Please correct if your views are different.

It will be great if you are able to reproduce the issue.

Thanks again.
Abhi



[cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]

Abhinav Bajaj
Senior Engineer
HERE Predictive Analytics
Office:  +12062092767
Mobile: +17083299516

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Robert Metzger <rm...@apache.org>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Tuesday, April 26, 2016 at 3:16 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

I've started my own EMR cluster and tried to launch a Flink job from my local machine on it.
I have to admin that configuring the EMR launched Hadoop for external access is quite a hassle.

I'm not even able to submit Flink to the YARN cluster because the client can not connect to the ResourceManager. I've change the resource manager hostname to the public one in the yarn-site.xml on the cluster and restarted it, but the client still can not connect.
It seems that the RM address is being overwritten by the Hadoop code?
[Inline image 1]

How did you manage to get this working?

In the VM settings, I disabled the "Source/Dest checks", but I don't think this is related.

Have you considered using Amazon's VPN service, I guess then you would have "local" access to the cluster?

On YARN, Flink is not using the flink-conf.yaml setting for the jobmanager's hostname. Its using YARN's "yarn.nodemanager.hostname" from the yarn-site.xml.
I haven't tried it, but it could work if you set the public hostname of each NodeManager in the yarn-site.xml.

Also, maybe the product forum / customer support of Amazon can help you here. Other systems like Spark or Storm have very similar architectures and will face the same issues. I guess they have some recipes for such situations.

Regards,
Robert




On Tue, Apr 26, 2016 at 10:47 AM, Robert Metzger <rm...@apache.org>> wrote:
Hi Abhi,

I'll try to reproduce the issue and come up with a solution.

On Tue, Apr 26, 2016 at 1:13 AM, Bajaj, Abhinav <ab...@here.com>> wrote:
Hi Fabian,

Thanks for your reply and the pointers to documentation.

In these steps, I think the Flink client is installed on the master node, referring to steps mentioned in Flink docs here<https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html>.
However, the scenario I have is to run the client on my local machine and submit jobs remotely to the YARN Cluster (running on EMR or independently).

Let me describe in more detail here.
I am trying to submit a single Flink Job to YARN using the client, running on my dev machine -

./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

In my understanding, YARN (running in AWS) allocates a container for the Jobmanager.
Jobmanager discovers the IP and started the Actor system. At this step the IP it uses is the internal IP address, of the EC2 instance.

The client, running on my dev machine, is not able to connect to the Jobmanager for reasons explained in my mail below.

Is there a way, where I can set Jobmanager to use the hostname and not the IP address?

Or any other suggestions?

Thanks,
Abhi

[cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]

Abhinav Bajaj
Senior Engineer
HERE Predictive Analytics
Office:  +12062092767<tel:%2B12062092767>
Mobile: +17083299516<tel:%2B17083299516>

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Fabian Hueske <fh...@gmail.com>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Wednesday, March 9, 2016 at 12:51 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Abhi,

I have used Flink on EMR via YARN a couple of times without problems.
I started a Flink YARN session like this:

./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

This will start five YARN containers (1 JobManager with 1024MB, 4 Taskmanagers with 4096MB). See more config options in the documentation [1].
In one of the last lines of the std-out output you should find a line that tells you the IP and port of the JobManager.

With the IP and port, you can submit a job as follows:

./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar <arguments>

This will send the job to the JobManager specified by IP and port and execute the program with a parallelism of 4. See more config options in the documentation [2].

If this does not help, could you share the exact command that you use to start the YARN session and submit the job?

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html

2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav <ab...@here.com>>:
Hi,

I am a newbie to Flink and trying to use it in AWS.
I have created a YARN cluster on AWS EC2 machines.
Trying to submit Flink job to the remote YARN cluster using the Flink Client running on my local machine.

The Jobmanager start successfully on the YARN container but the client is not able to connect to the Jobmanager.

Flink Client Logs -

13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient                         - YARN application has been deployed successfully.
13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start actor system.
13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start application client.
YARN cluster started
JobManager web interface address http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
Waiting until all TaskManagers have connected
13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
No status updates from the YARN cluster received so far. Waiting ...
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager.
No status updates from the YARN cluster received so far. Waiting ...

The logs of the Jobmanager contains the following -

21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]
21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]

It seems the problem is in the mismatch of the Jobmanager Akka actors system running address and the one user by the Client.
172.31.23.18 – is the internal private IP of the EC2 machine where the Jobmanager container is running.
54.35.41.12 – is the external IP of the EC2 machine, used by Flink client to submit the Job.
Because of this mismatch the messages are ignored by the Akka actor System.

Can someone please help me with this issue.
I can share the detailed logs, if required.

Thanks,
Abhi





Re: Submit Flink Jobs to YARN running on AWS

Posted by Robert Metzger <rm...@apache.org>.
I've started my own EMR cluster and tried to launch a Flink job from my
local machine on it.
I have to admin that configuring the EMR launched Hadoop for external
access is quite a hassle.

I'm not even able to submit Flink to the YARN cluster because the client
can not connect to the ResourceManager. I've change the resource manager
hostname to the public one in the yarn-site.xml on the cluster and
restarted it, but the client still can not connect.
It seems that the RM address is being overwritten by the Hadoop code?
[image: Inline image 1]

How did you manage to get this working?

In the VM settings, I disabled the "Source/Dest checks", but I don't think
this is related.

Have you considered using Amazon's VPN service, I guess then you would have
"local" access to the cluster?

On YARN, Flink is not using the flink-conf.yaml setting for the
jobmanager's hostname. Its using YARN's "yarn.nodemanager.hostname" from
the yarn-site.xml.
I haven't tried it, but it could work if you set the public hostname of
each NodeManager in the yarn-site.xml.

Also, maybe the product forum / customer support of Amazon can help you
here. Other systems like Spark or Storm have very similar architectures and
will face the same issues. I guess they have some recipes for such
situations.

Regards,
Robert




On Tue, Apr 26, 2016 at 10:47 AM, Robert Metzger <rm...@apache.org>
wrote:

> Hi Abhi,
>
> I'll try to reproduce the issue and come up with a solution.
>
> On Tue, Apr 26, 2016 at 1:13 AM, Bajaj, Abhinav <ab...@here.com>
> wrote:
>
>> Hi Fabian,
>>
>> Thanks for your reply and the pointers to documentation.
>>
>> In these steps, I think the Flink client is installed on the master node,
>> referring to steps mentioned in Flink docs here
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html>
>> .
>> However, the scenario I have is to run the client on my local machine and
>> submit jobs remotely to the YARN Cluster (running on EMR or independently).
>>
>> Let me describe in more detail here.
>> I am trying to submit a single Flink Job to YARN using the client,
>> running on my dev machine -
>>
>> ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096
>>  ./examples/batch/WordCount.jar
>>
>> In my understanding, YARN (running in AWS) allocates a container for the
>> Jobmanager.
>> Jobmanager discovers the IP and started the Actor system. At this step
>> the IP it uses is the internal IP address, of the EC2 instance.
>>
>> The client, running on my dev machine, is not able to connect to the
>> Jobmanager for reasons explained in my mail below.
>>
>> Is there a way, where I can set Jobmanager to use the hostname and not
>> the IP address?
>>
>> Or any other suggestions?
>>
>> Thanks,
>> Abhi
>>
>> *[image: cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]*
>>
>> *Abhinav Bajaj*
>>
>> Senior Engineer
>>
>> HERE Predictive Analytics
>>
>> Office:  +12062092767
>>
>> Mobile: +17083299516
>>
>> *HERE Seattle*
>>
>> 701 Pike Street, #2000, Seattle, WA 98101, USA
>>
>> *47° 36' 41" N. 122° 19' 57" W*
>>
>> *HERE Maps*
>>
>>
>>
>>
>> From: Fabian Hueske <fh...@gmail.com>
>> Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
>> Date: Wednesday, March 9, 2016 at 12:51 AM
>> To: "user@flink.apache.org" <us...@flink.apache.org>
>> Subject: Re: Submit Flink Jobs to YARN running on AWS
>>
>> Hi Abhi,
>>
>> I have used Flink on EMR via YARN a couple of times without problems.
>> I started a Flink YARN session like this:
>>
>> ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
>>
>> This will start five YARN containers (1 JobManager with 1024MB, 4
>> Taskmanagers with 4096MB). See more config options in the documentation [1].
>> In one of the last lines of the std-out output you should find a line
>> that tells you the IP and port of the JobManager.
>>
>> With the IP and port, you can submit a job as follows:
>>
>> ./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar <arguments>
>>
>> This will send the job to the JobManager specified by IP and port and
>> execute the program with a parallelism of 4. See more config options in the
>> documentation [2].
>>
>> If this does not help, could you share the exact command that you use to
>> start the YARN session and submit the job?
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html
>>
>> 2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav <ab...@here.com>:
>>
>>> Hi,
>>>
>>> I am a newbie to Flink and trying to use it in AWS.
>>> I have created a YARN cluster on AWS EC2 machines.
>>> Trying to submit Flink job to the remote YARN cluster using the Flink
>>> Client running on my local machine.
>>>
>>> The Jobmanager start successfully on the YARN container but the client
>>> is not able to connect to the Jobmanager.
>>>
>>> Flink Client Logs -
>>>
>>> 13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Deploying cluster, current state ACCEPTED
>>> 13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - Deploying cluster, current state ACCEPTED
>>> 13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient
>>>         - YARN application has been deployed successfully.
>>> 13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>          - Start actor system.
>>> 13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>>          - Start application client.
>>> YARN cluster started
>>> JobManager web interface address
>>> http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
>>> Waiting until all TaskManagers have connected
>>> 13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Notification about new leader address akka.tcp:
>>> //flink@54.35.41.12:41292/user/jobmanager with session ID null.
>>> No status updates from the YARN cluster received so far. Waiting ...
>>> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Received address of new leader akka.tcp://flink@54.35.41.12:41292/user/jobmanager
>>> with session ID null.
>>> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Disconnect from JobManager null.
>>> 13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient
>>>         - Trying to register at JobManager akka.tcp://flink@54.35.41.12
>>> :41292/user/jobmanager.
>>> No status updates from the YARN cluster received so far. Waiting ...
>>>
>>> The logs of the Jobmanager contains the following -
>>>
>>> 21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
>>> 21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
>>> 21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
>>>
>>> It seems the problem is in the mismatch of the Jobmanager Akka actors
>>> system running address and the one user by the Client.
>>> 172.31.23.18 – is the internal private IP of the EC2 machine where the
>>> Jobmanager container is running.
>>> 54.35.41.12 – is the external IP of the EC2 machine, used by Flink
>>> client to submit the Job.
>>> Because of this mismatch the messages are ignored by the Akka actor
>>> System.
>>>
>>> Can someone please help me with this issue.
>>> I can share the detailed logs, if required.
>>>
>>> Thanks,
>>> Abhi
>>>
>>>
>>
>

Re: Submit Flink Jobs to YARN running on AWS

Posted by Robert Metzger <rm...@apache.org>.
Hi Abhi,

I'll try to reproduce the issue and come up with a solution.

On Tue, Apr 26, 2016 at 1:13 AM, Bajaj, Abhinav <ab...@here.com>
wrote:

> Hi Fabian,
>
> Thanks for your reply and the pointers to documentation.
>
> In these steps, I think the Flink client is installed on the master node,
> referring to steps mentioned in Flink docs here
> <https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html>
> .
> However, the scenario I have is to run the client on my local machine and
> submit jobs remotely to the YARN Cluster (running on EMR or independently).
>
> Let me describe in more detail here.
> I am trying to submit a single Flink Job to YARN using the client, running
> on my dev machine -
>
> ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096
>  ./examples/batch/WordCount.jar
>
> In my understanding, YARN (running in AWS) allocates a container for the
> Jobmanager.
> Jobmanager discovers the IP and started the Actor system. At this step the
> IP it uses is the internal IP address, of the EC2 instance.
>
> The client, running on my dev machine, is not able to connect to the
> Jobmanager for reasons explained in my mail below.
>
> Is there a way, where I can set Jobmanager to use the hostname and not the
> IP address?
>
> Or any other suggestions?
>
> Thanks,
> Abhi
>
> *[image: cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]*
>
> *Abhinav Bajaj*
>
> Senior Engineer
>
> HERE Predictive Analytics
>
> Office:  +12062092767
>
> Mobile: +17083299516
>
> *HERE Seattle*
>
> 701 Pike Street, #2000, Seattle, WA 98101, USA
>
> *47° 36' 41" N. 122° 19' 57" W*
>
> *HERE Maps*
>
>
>
>
> From: Fabian Hueske <fh...@gmail.com>
> Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
> Date: Wednesday, March 9, 2016 at 12:51 AM
> To: "user@flink.apache.org" <us...@flink.apache.org>
> Subject: Re: Submit Flink Jobs to YARN running on AWS
>
> Hi Abhi,
>
> I have used Flink on EMR via YARN a couple of times without problems.
> I started a Flink YARN session like this:
>
> ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
>
> This will start five YARN containers (1 JobManager with 1024MB, 4
> Taskmanagers with 4096MB). See more config options in the documentation [1].
> In one of the last lines of the std-out output you should find a line that
> tells you the IP and port of the JobManager.
>
> With the IP and port, you can submit a job as follows:
>
> ./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar <arguments>
>
> This will send the job to the JobManager specified by IP and port and
> execute the program with a parallelism of 4. See more config options in the
> documentation [2].
>
> If this does not help, could you share the exact command that you use to
> start the YARN session and submit the job?
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html
>
> 2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav <ab...@here.com>:
>
>> Hi,
>>
>> I am a newbie to Flink and trying to use it in AWS.
>> I have created a YARN cluster on AWS EC2 machines.
>> Trying to submit Flink job to the remote YARN cluster using the Flink
>> Client running on my local machine.
>>
>> The Jobmanager start successfully on the YARN container but the client is
>> not able to connect to the Jobmanager.
>>
>> Flink Client Logs -
>>
>> 13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - Deploying cluster, current state ACCEPTED
>> 13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - Deploying cluster, current state ACCEPTED
>> 13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient
>>         - YARN application has been deployed successfully.
>> 13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>        - Start actor system.
>> 13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster
>>        - Start application client.
>> YARN cluster started
>> JobManager web interface address
>> http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
>> Waiting until all TaskManagers have connected
>> 13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Notification about new leader address akka.tcp:
>> //flink@54.35.41.12:41292/user/jobmanager with session ID null.
>> No status updates from the YARN cluster received so far. Waiting ...
>> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Received address of new leader akka.tcp://flink@54.35.41.12:41292/user/jobmanager
>> with session ID null.
>> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Disconnect from JobManager null.
>> 13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient
>>         - Trying to register at JobManager akka.tcp://flink@54.35.41.12
>> :41292/user/jobmanager.
>> No status updates from the YARN cluster received so far. Waiting ...
>>
>> The logs of the Jobmanager contains the following -
>>
>> 21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
>> 21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
>> 21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
>>
>> It seems the problem is in the mismatch of the Jobmanager Akka actors
>> system running address and the one user by the Client.
>> 172.31.23.18 – is the internal private IP of the EC2 machine where the
>> Jobmanager container is running.
>> 54.35.41.12 – is the external IP of the EC2 machine, used by Flink client
>> to submit the Job.
>> Because of this mismatch the messages are ignored by the Akka actor
>> System.
>>
>> Can someone please help me with this issue.
>> I can share the detailed logs, if required.
>>
>> Thanks,
>> Abhi
>>
>>
>

Re: Submit Flink Jobs to YARN running on AWS

Posted by "Bajaj, Abhinav" <ab...@here.com>.
Hi Fabian,

Thanks for your reply and the pointers to documentation.

In these steps, I think the Flink client is installed on the master node, referring to steps mentioned in Flink docs here<https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html>.
However, the scenario I have is to run the client on my local machine and submit jobs remotely to the YARN Cluster (running on EMR or independently).

Let me describe in more detail here.
I am trying to submit a single Flink Job to YARN using the client, running on my dev machine -

./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

In my understanding, YARN (running in AWS) allocates a container for the Jobmanager.
Jobmanager discovers the IP and started the Actor system. At this step the IP it uses is the internal IP address, of the EC2 instance.

The client, running on my dev machine, is not able to connect to the Jobmanager for reasons explained in my mail below.

Is there a way, where I can set Jobmanager to use the hostname and not the IP address?

Or any other suggestions?

Thanks,
Abhi

[cid:DACBF116-FD8C-48DB-B91D-D54510B306E8]

Abhinav Bajaj
Senior Engineer
HERE Predictive Analytics
Office:  +12062092767
Mobile: +17083299516

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps




From: Fabian Hueske <fh...@gmail.com>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Wednesday, March 9, 2016 at 12:51 AM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Submit Flink Jobs to YARN running on AWS

Hi Abhi,

I have used Flink on EMR via YARN a couple of times without problems.
I started a Flink YARN session like this:

./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

This will start five YARN containers (1 JobManager with 1024MB, 4 Taskmanagers with 4096MB). See more config options in the documentation [1].
In one of the last lines of the std-out output you should find a line that tells you the IP and port of the JobManager.

With the IP and port, you can submit a job as follows:

./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar <arguments>

This will send the job to the JobManager specified by IP and port and execute the program with a parallelism of 4. See more config options in the documentation [2].

If this does not help, could you share the exact command that you use to start the YARN session and submit the job?

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html

2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav <ab...@here.com>>:
Hi,

I am a newbie to Flink and trying to use it in AWS.
I have created a YARN cluster on AWS EC2 machines.
Trying to submit Flink job to the remote YARN cluster using the Flink Client running on my local machine.

The Jobmanager start successfully on the YARN container but the client is not able to connect to the Jobmanager.

Flink Client Logs -

13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient                         - Deploying cluster, current state ACCEPTED
13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient                         - YARN application has been deployed successfully.
13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start actor system.
13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster                        - Start application client.
YARN cluster started
JobManager web interface address http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
Waiting until all TaskManagers have connected
13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient                       - Notification about new leader address akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
No status updates from the YARN cluster received so far. Waiting ...
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Received address of new leader akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager with session ID null.
13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager null.
13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient                       - Trying to register at JobManager akka.tcp://flink@54.35.41.12<ma...@54.35.41.12>:41292/user/jobmanager.
No status updates from the YARN cluster received so far. Waiting ...

The logs of the Jobmanager contains the following -

21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]
21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18<ma...@172.31.23.18>:41292]

It seems the problem is in the mismatch of the Jobmanager Akka actors system running address and the one user by the Client.
172.31.23.18 – is the internal private IP of the EC2 machine where the Jobmanager container is running.
54.35.41.12 – is the external IP of the EC2 machine, used by Flink client to submit the Job.
Because of this mismatch the messages are ignored by the Akka actor System.

Can someone please help me with this issue.
I can share the detailed logs, if required.

Thanks,
Abhi



Re: Submit Flink Jobs to YARN running on AWS

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Abhi,

I have used Flink on EMR via YARN a couple of times without problems.
I started a Flink YARN session like this:

./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

This will start five YARN containers (1 JobManager with 1024MB, 4
Taskmanagers with 4096MB). See more config options in the documentation [1].
In one of the last lines of the std-out output you should find a line that
tells you the IP and port of the JobManager.

With the IP and port, you can submit a job as follows:

./bin/flink run -m jmIP:jmPort -p 4 jobJarFile.jar <arguments>

This will send the job to the JobManager specified by IP and port and
execute the program with a parallelism of 4. See more config options in the
documentation [2].

If this does not help, could you share the exact command that you use to
start the YARN session and submit the job?

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html

2016-03-08 0:25 GMT+01:00 Bajaj, Abhinav <ab...@here.com>:

> Hi,
>
> I am a newbie to Flink and trying to use it in AWS.
> I have created a YARN cluster on AWS EC2 machines.
> Trying to submit Flink job to the remote YARN cluster using the Flink
> Client running on my local machine.
>
> The Jobmanager start successfully on the YARN container but the client is
> not able to connect to the Jobmanager.
>
> Flink Client Logs -
>
> 13:57:34,877 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - Deploying cluster, current state ACCEPTED
> 13:57:35,951 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - Deploying cluster, current state ACCEPTED
> 13:57:37,027 INFO  org.apache.flink.yarn.FlinkYarnClient
>       - YARN application has been deployed successfully.
> 13:57:37,100 INFO  org.apache.flink.yarn.FlinkYarnCluster
>        - Start actor system.
> 13:57:37,532 INFO  org.apache.flink.yarn.FlinkYarnCluster
>        - Start application client.
> YARN cluster started
> JobManager web interface address
> http://ec2-XX-XX-XX-XX.compute-1.amazonaws.com:8088/proxy/application_1456184947990_0003/
> Waiting until all TaskManagers have connected
> 13:57:37,540 INFO  org.apache.flink.yarn.ApplicationClient
>       - Notification about new leader address akka.tcp://flink@54.35.41.12:41292/user/jobmanager
> with session ID null.
> No status updates from the YARN cluster received so far. Waiting ...
> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>       - Received address of new leader akka.tcp://flink@54.35.41.12:41292/user/jobmanager
> with session ID null.
> 13:57:37,543 INFO  org.apache.flink.yarn.ApplicationClient
>       - Disconnect from JobManager null.
> 13:57:37,545 INFO  org.apache.flink.yarn.ApplicationClient
>       - Trying to register at JobManager akka.tcp://flink@54.35.41.12
> :41292/user/jobmanager.
> No status updates from the YARN cluster received so far. Waiting ...
>
> The logs of the Jobmanager contains the following -
>
> 21:57:39,142 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
> 21:57:40,782 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at ec2-54-35-41-12 (akka.tcp://flink@172.31.23.18:60565/user/taskmanager) as 72101dd2ee94caa7a5ec5a75488359aa. Current number of registered hosts is 1. Current number of alive task slots is 1.
> 21:57:41,162 ERROR akka.remote.EndpointWriter                                    - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@54.35.41.12:41292/]] arriving at [akka.tcp://flink@54.35.41.12:41292] inbound addresses are [akka.tcp://flink@172.31.23.18:41292]
>
> It seems the problem is in the mismatch of the Jobmanager Akka actors
> system running address and the one user by the Client.
> 172.31.23.18 – is the internal private IP of the EC2 machine where the
> Jobmanager container is running.
> 54.35.41.12 – is the external IP of the EC2 machine, used by Flink client
> to submit the Job.
> Because of this mismatch the messages are ignored by the Akka actor System.
>
> Can someone please help me with this issue.
> I can share the detailed logs, if required.
>
> Thanks,
> Abhi
>
>