You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Bajaj, Abhinav" <ab...@here.com> on 2019/09/13 02:29:48 UTC

Jobsubmission fails in Flink 1.7.1 High Availability mode

Hi,

I came across an issue during job submission via Flink Cli Client with Flink 1.7.1 in high availability mode.

Setup:
Flink version:: 1.7.1
Cluster:: K8s
Mode:: High availability with 2 jobmanagers

CLI Command
./bin/flink run -d -c MyExample /myexample.jar
The CLI runs inside a K8s job and submits the Flink job to the Flink cluster. The K8s job spec allows it to try 3 times to submit the job.

Result:
2019-09-11 22:32:12.908 [Flink-RestClusterClient-IO-thread-4] level=DEBUG org.apache.flink.runtime.rest.RestClient  - Sending request of class class org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody to job-jm-1.job-jm-svc.job-namespace.svc.cluster.local:8081/v1/jobs
2019-09-11 22:32:14.186 [flink-rest-client-netty-thread-1] level=ERROR org.apache.flink.runtime.rest.RestClient  - Response was not valid JSON.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
at [Source: org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@2b88f8bb; line: 1, column: 0]
      at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:256)
      at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3851)
      at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
      at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2272)
      at org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:504)
      at org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:452)
             ………
2019-09-11 22:32:14.186 [flink-rest-client-netty-thread-1] level=ERROR org.apache.flink.runtime.rest.RestClient  - Unexpected plain-text response:
……..

The job submission fails after exhausting the number of retries.

Observations:
I looked into the debug logs & Flink code to come to below conclusions –

  *   CLI rest client received an empty response body from the jobmanager (job-jm-1). I think the response was a redirect and the RestClient class does not handle redirects. This explains the above exception from Jackson and missing response body logged in “org.apache.flink.runtime.rest.RestClient  - Unexpected plain-text response:” logs above.
  *   The ZooKeeperLeaderRetrievalService in the rest client logs that job-jm-1 became leader followed by a log that job-jm-0 became leader. The address of job-jm-1 is http and address of job-jm-0 is akka url. CLI logs at end of email.
  *   The RestClusterClient class does not update the leader during the job submission if the leader changes.
  *   All the 3 times the CLI K8s job tried to submit the Flink job, ZooKeeperLeaderRetrievalService finds both the events of job-jm-1 becoming the leader followed by job-jm-0. So all the 3 retries fails to submit the job with same reason of empty response.
  *   The jobmanager logs from both job-jm-0 and job-jm-1 shows that job-jm-0 is the leader and job-jm-1 was never a leader. This contradicts the CLI logs.

Open questions:

  *   I am not sure why the CLI’s ZooKeeperLeaderRetrievalService thinks job-jm-1 was the leader whereas the both jobmanager’s ZooKeeperLeaderRetrievalService considers job-jm-0 as the leader throughout the cluster lifetime.
  *   Even if CLI’s ZooKeeperLeaderRetrievalService thinks leader has changed from job-jm-1 to job-jm-0, it still uses job-jm-1. Is that a known issue with Flink 1.7.1 rest client that it doesn’t update the leader if it changed?
  *   Why one leader address is http while other is akka url?

Can someone help check and confirm my observations above and help answer the questions?

Highly appreciate your time and help.

~ Abhinav Bajaj


CLI Logs -

2019-09-11 22:30:31.077 [main-EventThread] level=DEBUG o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Leader node has changed.
2019-09-11 22:30:31.171 [main-EventThread] level=DEBUG o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader information: Leader=http://job-jm-1.job-jm-svc.job-namespace.svc.cluster.local:8081, session ID=c1422a1b-a6b8-43b0-85d7-87b95af16932.

……
2019-09-11 22:30:31.270 [main-EventThread] level=DEBUG o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Leader node has changed
2019-09-11 22:30:31.270 [main-EventThread] level=DEBUG o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader information: Leader=akka.tcp://flink@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/dispatcher, session ID=4e4d03d5-2abe-449c-af2e-df2e0cd80e26



job-jm-0 Logs -
2019-09-11 22:29:59.781 [main-EventThread] level=DEBUG o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader information: Leader=akka.tcp://flink@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/resourcemanager, session ID=e1f026b1-e368-4524-9fab-2e031423f74f.
2019-09-11 22:29:59.876 [main-EventThread] level=DEBUG o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader information: Leader=akka.tcp://flink@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/dispatcher, session ID=4e4d03d5-2abe-449c-af2e-df2e0cd80e26.


job-jm-1 Logs -
2019-09-11 22:29:59.889 [main-EventThread] level=DEBUG o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader information: Leader=akka.tcp://flink@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/resourcemanager, session ID=e1f026b1-e368-4524-9fab-2e031423f74f.
2019-09-11 22:29:59.976 [main-EventThread] level=DEBUG o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader information: Leader=akka.tcp://flink@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/dispatcher, session ID=4e4d03d5-2abe-449c-af2e-df2e0cd80e26.






Re: Jobsubmission fails in Flink 1.7.1 High Availability mode

Posted by Till Rohrmann <tr...@apache.org>.
Hi Abhinav,

I think the problem is the following: Flink has been designed so that the
cluster's rest endpoint does not need to run in the same process as the
JobManager. However, currently the rest endpoint is started in the same
process as the JobManagers. Because of the design one needs to announce the
address of the rest endpoint and for that we use leader election (it is not
strictly required that there is a leading rest endpoint but we use it
mainly for service discovery). That's why you see that there is one leader
with an http address and another with an akka address. The former is the
leading rest endpoint and the latter is the leading JobManager. So somehow,
the rest endpoint of pod-0 and the JobManager of pod-1 became leaders.

Now what happens with Flink <= 1.7 is that the rest endpoint sends you a
redirect response if the co-located JobManager (process-wise) is not the
leader. The problem is that Flink's RestClusterClient does not properly
handle the redirect responses.

Starting from Flink >= 1.8, we removed the redirection logic and instead
let Flink internally handle this by proxying all request to the actual
leading JobManager. Hence, my recommendation would be to upgrade to a newer
Flink version and see whether the problem still remains.

Cheers,
Till

On Fri, Sep 13, 2019 at 4:30 AM Bajaj, Abhinav <ab...@here.com>
wrote:

> Hi,
>
>
>
> I came across an issue during job submission via Flink Cli Client with
> Flink 1.7.1 in high availability mode.
>
>
>
> *Setup:*
>
> Flink version:: 1.7.1
>
> Cluster:: K8s
>
> Mode:: High availability with 2 jobmanagers
>
>
>
> *CLI Command*
>
> ./bin/flink run -d -c MyExample /myexample.jar
>
> The CLI runs inside a K8s job and submits the Flink job to the Flink
> cluster. The K8s job spec allows it to try 3 times to submit the job.
>
>
>
> *Result:*
>
> 2019-09-11 22:32:12.908 [Flink-RestClusterClient-IO-thread-4] level=DEBUG
> org.apache.flink.runtime.rest.RestClient  - Sending request of class class
> org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody to
> job-jm-1.job-jm-svc.job-namespace.svc.cluster.local:8081/v1/jobs
>
> 2019-09-11 22:32:14.186 [flink-rest-client-netty-thread-1] level=ERROR
> org.apache.flink.runtime.rest.RestClient  - Response was not valid JSON.
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
> No content to map due to end-of-input
>
> at [Source:
> org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@2b88f8bb;
> line: 1, column: 0]
>
>       at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:256)
>
>       at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3851)
>
>       at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
>
>       at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2272)
>
>       at
> org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:504)
>
>       at
> org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:452)
>
>              ………
>
> 2019-09-11 22:32:14.186 [flink-rest-client-netty-thread-1] level=ERROR
> org.apache.flink.runtime.rest.RestClient  - Unexpected plain-text response:
>
> ……..
>
>
>
> The job submission fails after exhausting the number of retries.
>
>
>
> *Observations:*
>
> I looked into the debug logs & Flink code to come to below conclusions –
>
>    - CLI rest client received an empty response body from the jobmanager
>    (job-jm-1). I think the response was a redirect and the RestClient class
>    does not handle redirects. This explains the above exception from Jackson
>    and missing response body logged in
>    “org.apache.flink.runtime.rest.RestClient  - Unexpected plain-text
>    response:” logs above.
>    - The ZooKeeperLeaderRetrievalService in the rest client logs that
>    job-jm-1 became leader followed by a log that job-jm-0 became leader. The
>    address of job-jm-1 is http and address of job-jm-0 is akka url. CLI logs
>    at end of email.
>    - The RestClusterClient class does not update the leader during the
>    job submission if the leader changes.
>    - All the 3 times the CLI K8s job tried to submit the Flink job,
>    ZooKeeperLeaderRetrievalService finds both the events of job-jm-1 becoming
>    the leader followed by job-jm-0. So all the 3 retries fails to submit the
>    job with same reason of empty response.
>    - The jobmanager logs from both job-jm-0 and job-jm-1 shows that
>    job-jm-0 is the leader and job-jm-1 was never a leader. This contradicts
>    the CLI logs.
>
>
>
> *Open questions:*
>
>    - I am not sure why the CLI’s ZooKeeperLeaderRetrievalService thinks
>    job-jm-1 was the leader whereas the both jobmanager’s ZooKeeperLeaderRetrievalService
>    considers job-jm-0 as the leader throughout the cluster lifetime.
>    - Even if CLI’s ZooKeeperLeaderRetrievalService thinks leader has
>    changed from job-jm-1 to job-jm-0, it still uses job-jm-1. Is that a known
>    issue with Flink 1.7.1 rest client that it doesn’t update the leader if it
>    changed?
>    - Why one leader address is http while other is akka url?
>
>
>
> Can someone help check and confirm my observations above and help answer
> the questions?
>
>
>
> Highly appreciate your time and help.
>
>
>
> ~ Abhinav Bajaj
>
>
>
> *CLI Logs *-
>
> 2019-09-11 22:30:31.077 [main-EventThread] level=DEBUG
> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Leader node has
> changed.
> 2019-09-11 22:30:31.171 [main-EventThread] level=DEBUG
> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader
> information: Leader=
> http://job-jm-1.job-jm-svc.job-namespace.svc.cluster.local:8081, session
> ID=c1422a1b-a6b8-43b0-85d7-87b95af16932.
>
> ……
> 2019-09-11 22:30:31.270 [main-EventThread] level=DEBUG
> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Leader node has
> changed
> 2019-09-11 22:30:31.270 [main-EventThread] level=DEBUG
> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader
> information:
> Leader=akka.tcp://flink@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/dispatcher,
> session ID=4e4d03d5-2abe-449c-af2e-df2e0cd80e26
>
>
>
> *job-jm-0 Logs -*
> 2019-09-11 22:29:59.781 [main-EventThread] level=DEBUG
> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader
> information:
> Leader=akka.tcp://flink@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/resourcemanager,
> session ID=e1f026b1-e368-4524-9fab-2e031423f74f.
> 2019-09-11 22:29:59.876 [main-EventThread] level=DEBUG
> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader
> information:
> Leader=akka.tcp://flink@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/dispatcher,
> session ID=4e4d03d5-2abe-449c-af2e-df2e0cd80e26.
>
> *job-jm-1 Logs -*
> 2019-09-11 22:29:59.889 [main-EventThread] level=DEBUG
> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader
> information:
> Leader=akka.tcp://flink@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/resourcemanager,
> session ID=e1f026b1-e368-4524-9fab-2e031423f74f.
> 2019-09-11 22:29:59.976 [main-EventThread] level=DEBUG
> o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - New leader
> information:
> Leader=akka.tcp://flink@job-jm-0.job-jm-svc.job-namespace.svc.cluster.local:6126/user/dispatcher,
> session ID=4e4d03d5-2abe-449c-af2e-df2e0cd80e26.
>
>
>
>
>
>
>