You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Matthias Pohl <ma...@ververica.com> on 2021/09/07 09:05:45 UTC

Re: hdfs lease issues on flink retry

Just for documentation purposes: I created FLINK-24147 [1] to cover this
issue.

[1] https://issues.apache.org/jira/browse/FLINK-24147

On Thu, Aug 26, 2021 at 6:14 PM Matthias Pohl <ma...@ververica.com>
wrote:

> I see - I should have checked my mailbox before answering. I received the
> email and was able to login.
>
> On Thu, Aug 26, 2021 at 6:12 PM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> The link doesn't work, i.e. I'm redirected to a login page. It would be
>> also good to include the Flink logs and make them accessible for everyone.
>> This way others could share their perspective as well...
>>
>> On Thu, Aug 26, 2021 at 5:40 PM Shah, Siddharth [Engineering] <
>> Siddharth.x.Shah@gs.com> wrote:
>>
>>> Hi Matthias,
>>>
>>>
>>>
>>> Thank you for responding and taking time to look at the issue.
>>>
>>>
>>>
>>> Uploaded the yarn lags here:
>>> https://lockbox.gs.com/lockbox/folders/963b0f29-85ad-4580-b420-8c66d9c07a84/
>>> and have also requested read permissions for you. Please let us know if
>>> you’re not able to see the files.
>>>
>>>
>>>
>>>
>>>
>>> *From:* Matthias Pohl <ma...@ververica.com>
>>> *Sent:* Thursday, August 26, 2021 9:47 AM
>>> *To:* Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
>>> *Cc:* user@flink.apache.org; Hailu, Andreas [Engineering] <
>>> Andreas.Hailu@ny.email.gs.com>
>>> *Subject:* Re: hdfs lease issues on flink retry
>>>
>>>
>>>
>>> Hi Siddharth,
>>>
>>> thanks for reaching out to the community. This might be a bug. Could you
>>> share your Flink and YARN logs? This way we could get a better
>>> understanding of what's going on.
>>>
>>>
>>>
>>> Best,
>>> Matthias
>>>
>>>
>>>
>>> On Tue, Aug 24, 2021 at 10:19 PM Shah, Siddharth [Engineering] <
>>> Siddharth.x.Shah@gs.com> wrote:
>>>
>>> Hi  Team,
>>>
>>>
>>>
>>> We are seeing transient failures in the jobs mostly requiring higher
>>> resources and using flink RestartStrategies
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>
>>> [1]. Upon checking the yarn logs we have observed hdfs lease issues when
>>> flink retry happens. The job originally fails for the first try with PartitionNotFoundException
>>> or NoResourceAvailableException., but on retry it seems form the yarn logs
>>> is that the lease for the temp sink directory is not yet released by the
>>> node from previous try.
>>>
>>>
>>>
>>> Initial Failure log message:
>>>
>>>
>>>
>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>> Could not allocate enough slots to run the job. Please make sure that the
>>> cluster has enough resources.
>>>
>>>         at
>>> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:461)
>>>
>>>         at
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>>
>>>         at
>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>>
>>>         at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>
>>>         at
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>
>>>         at
>>> org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:190)
>>>
>>>         at
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>>
>>>         at
>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>>
>>>         at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>
>>>         at
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>
>>>
>>>
>>>
>>>
>>> Retry failure log message:
>>>
>>>
>>>
>>> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): /user/p2epda/lake/delp_prod/PROD/APPROVED/data/TECHRISK_SENTINEL/INFORMATION_REPORT/4377/temp/data/_temporary/0/_temporary/attempt__0000_r_000003_0/partMapper-r-00003.snappy.parquet for client 10.51.63.226 already exists
>>>
>>>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2815)
>>>
>>>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)
>>>
>>>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)
>>>
>>>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
>>>
>>>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
>>>
>>>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>
>>>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> I could verify that it’s the same nodes from previous try owning the
>>> lease, and checked for multiple jobs by matching IP addresses. Ideally, we
>>> want an internal retry to happen since there will be thousands of jobs
>>> running at a time and hard to manually retry them.
>>>
>>>
>>>
>>> This is our current restart config:
>>>
>>> executionEnv.setRestartStrategy(RestartStrategies.*fixedDelayRestart*(3,
>>> Time.*of*(10, TimeUnit.*SECONDS*)));
>>>
>>>
>>>
>>> Is it possible to resolve leases before a retry? Or is it possible to
>>> have different sink directories (increment attempt id somewhere) for every
>>> retry, that way we have no lease issues? Or do you have any other
>>> suggestion on resolving this?
>>>
>>>
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#fixed-delay-restart-strategy
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Siddharth
>>>
>>>
>>>
>>>
>>> ------------------------------
>>>
>>>
>>> Your Personal Data: We may collect and process information about you
>>> that may be subject to data protection laws. For more information about how
>>> we use and disclose your personal data, how we protect your information,
>>> our legal basis to use your information, your rights and who you can
>>> contact, please refer to: www.gs.com/privacy-notices
>>>
>>>
>>> ------------------------------
>>>
>>> Your Personal Data: We may collect and process information about you
>>> that may be subject to data protection laws. For more information about how
>>> we use and disclose your personal data, how we protect your information,
>>> our legal basis to use your information, your rights and who you can
>>> contact, please refer to: www.gs.com/privacy-notices
>>>
>>
>>
>
> --
>
> Matthias Pohl | Engineer
>
> Follow us @VervericaData Ververica <https://www.ververica.com/>
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>

RE: hdfs lease issues on flink retry

Posted by "Shah, Siddharth" <Si...@gs.com>.
Hi David/Matthias,

Thank you for your suggestion, it seems to be working fine. Had a quick question – would these _temporary directories created by DataSink task on retry require clean up or flink internally would take care of clean up part?

From: David Morávek <dm...@apache.org>
Sent: Monday, September 20, 2021 5:12 PM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
Cc: Hailu, Andreas [Engineering] <An...@ny.email.gs.com>; Matthias Pohl <ma...@ververica.com>; user@flink.apache.org
Subject: Re: hdfs lease issues on flink retry

Hi, AttemptId needs to be an integer (take a look at TaskAttemptID class for more details). As for your prior question, any random id should solve the issue.

Best,
D.

On Mon 20. 9. 2021 at 22:32, Shah, Siddharth <Si...@gs.com>> wrote:
Hi Matthias,

Thanks for suggesting a workaround, but our jobs fail with below error when I apply that change

Caused by: java.lang.IllegalArgumentException: TaskAttemptId string : attempt__0000_r_000001_1632168799349 is not properly formed
        at org.apache.hadoop.mapreduce.TaskAttemptID.forName(TaskAttemptID.java:201)
        at com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:59)
        at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:745)


Am I missing something here?

Thanks,
Siddharth

From: Matthias Pohl <ma...@ververica.com>>
Sent: Monday, September 20, 2021 4:54 AM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>
Subject: Re: hdfs lease issues on flink retry

I don't know of any side effects of your approach. But another workaround I saw was replacing the _0 suffix by something like "_" + System.currentMillis()

On Fri, Sep 17, 2021 at 8:38 PM Shah, Siddharth <Si...@gs.com>> wrote:
Hi Matthias,

Thanks for looking into the issue and creating a ticket. I am thinking of having a workaround until the issue is fixed.

What if I create the attempt directories with a random int by patching HadoopOutputFormatBase’s open() method?

Original:

TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
      + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(taskNumber + 1)
      + "_0");


Patched:

int attemptRandomPrefix = new Random().nextInt(999);

TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__"
      + String.format("%" + (4 - Integer.toString(attemptRandomPrefix).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(attemptRandomPrefix) + "_r_"
      + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(taskNumber + 1)
      + "_0");


So basically I am creating a directory named attempt__0123_r_0001_0 instead of attempt__0000_r_0001_0. I have tested on a handful of our jobs and seems to  be working fine. Just wanted to check any downside of this changes that I may not be aware of?

Thanks,
Siddharth



From: Matthias Pohl <ma...@ververica.com>>
Sent: Tuesday, September 07, 2021 5:06 AM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>
Subject: Re: hdfs lease issues on flink retry

Just for documentation purposes: I created FLINK-24147 [1] to cover this issue.

[1] https://issues.apache.org/jira/browse/FLINK-24147<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D24147&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=LgUitz7kzpyweO3xqm7f19qxwbHh_LbQ-_M1zOxutpM&e=>

On Thu, Aug 26, 2021 at 6:14 PM Matthias Pohl <ma...@ververica.com>> wrote:
I see - I should have checked my mailbox before answering. I received the email and was able to login.

On Thu, Aug 26, 2021 at 6:12 PM Matthias Pohl <ma...@ververica.com>> wrote:
The link doesn't work, i.e. I'm redirected to a login page. It would be also good to include the Flink logs and make them accessible for everyone. This way others could share their perspective as well...

On Thu, Aug 26, 2021 at 5:40 PM Shah, Siddharth [Engineering] <Si...@gs.com>> wrote:
Hi Matthias,

Thank you for responding and taking time to look at the issue.

Uploaded the yarn lags here: https://lockbox.gs.com/lockbox/folders/963b0f29-85ad-4580-b420-8c66d9c07a84/ and have also requested read permissions for you. Please let us know if you’re not able to see the files.


From: Matthias Pohl <ma...@ververica.com>>
Sent: Thursday, August 26, 2021 9:47 AM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>
Subject: Re: hdfs lease issues on flink retry

Hi Siddharth,
thanks for reaching out to the community. This might be a bug. Could you share your Flink and YARN logs? This way we could get a better understanding of what's going on.

Best,
Matthias

On Tue, Aug 24, 2021 at 10:19 PM Shah, Siddharth [Engineering] <Si...@gs.com>> wrote:
Hi  Team,

We are seeing transient failures in the jobs mostly requiring higher resources and using flink RestartStrategies<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=> [1]. Upon checking the yarn logs we have observed hdfs lease issues when flink retry happens. The job originally fails for the first try with PartitionNotFoundException or NoResourceAvailableException., but on retry it seems form the yarn logs is that the lease for the temp sink directory is not yet released by the node from previous try.

Initial Failure log message:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate enough slots to run the job. Please make sure that the cluster has enough resources.
        at org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:461)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:190)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)


Retry failure log message:


Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): /user/p2epda/lake/delp_prod/PROD/APPROVED/data/TECHRISK_SENTINEL/INFORMATION_REPORT/4377/temp/data/_temporary/0/_temporary/attempt__0000_r_000003_0/partMapper-r-00003.snappy.parquet for client 10.51.63.226 already exists

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2815)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)



I could verify that it’s the same nodes from previous try owning the lease, and checked for multiple jobs by matching IP addresses. Ideally, we want an internal retry to happen since there will be thousands of jobs running at a time and hard to manually retry them.

This is our current restart config:
executionEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

Is it possible to resolve leases before a retry? Or is it possible to have different sink directories (increment attempt id somewhere) for every retry, that way we have no lease issues? Or do you have any other suggestion on resolving this?


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#fixed-delay-restart-strategy<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>


Thanks,
Siddharth


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>



--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.ververica.com_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=yD5mFS5pqguoGU4aRj0pSWC55EeAGjSn-GOfK26ZYk8&e=>

--

Join Flink Forward<https://urldefense.proofpoint.com/v2/url?u=https-3A__flink-2Dforward.org_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=E2wLfyIeTdz7eBxapl_pWf3hmDuxjkVK8N0xMc4o0PE&e=> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re: hdfs lease issues on flink retry

Posted by David Morávek <dm...@apache.org>.
I’m away from computer so I can’t verify this, but as far as I remember,
HadoopOutputFormat needs to finish output lifecycle by calling #commitJob /
#abortJob. These methods will move committed task outputs to a top level
directory and clean up all the remaining files in the temp directory. You
should check your sink’s source code to see under which condition these
methods get called.

Best,
D.

On Fri 24. 9. 2021 at 16:00, Shah, Siddharth <Si...@gs.com>
wrote:

> Hi David/Matthias,
>
>
>
> Just checking again on my previous question.
>
>
>
> *From:* Shah, Siddharth [Engineering]
> *Sent:* Thursday, September 23, 2021 9:46 AM
> *To:* 'David Morávek' <dm...@apache.org>; Matthias Pohl <
> matthias@ververica.com>
> *Cc:* Hailu, Andreas [Engineering] <An...@ny.email.gs.com>;
> user@flink.apache.org; Erai, Rahul [Engineering] <
> Rahul.Erai@ny.email.gs.com>; Chittajallu, Rajiv [Engineering] <
> Rajiv.Chittajallu@ny.email.gs.com>
> *Subject:* RE: hdfs lease issues on flink retry
>
>
>
> Hi David/Matthias,
>
>
>
> Thank you for your suggestion, it seems to be working fine. Had a quick
> question – would these *_temporary* directories created by DataSink task
> on retry require clean up or flink internally would take care of clean up
> part?
>
>
>
> *From:* David Morávek <dm...@apache.org>
>
> *Sent:* Monday, September 20, 2021 5:12 PM
> *To:* Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
> *Cc:* Hailu, Andreas [Engineering] <An...@ny.email.gs.com>;
> Matthias Pohl <ma...@ververica.com>; user@flink.apache.org
> *Subject:* Re: hdfs lease issues on flink retry
>
>
>
> Hi, AttemptId needs to be an integer (take a look at TaskAttemptID class
> for more details). As for your prior question, any random id should solve
> the issue.
>
>
>
> Best,
>
> D.
>
>
>
> On Mon 20. 9. 2021 at 22:32, Shah, Siddharth <Si...@gs.com>
> wrote:
>
> Hi Matthias,
>
>
>
> Thanks for suggesting a workaround, but our jobs fail with below error
> when I apply that change
>
>
>
> *Caused by: java.lang.IllegalArgumentException: TaskAttemptId string :
> attempt__0000_r_000001_1632168799349 is not properly formed*
>
>         at
> org.apache.hadoop.mapreduce.TaskAttemptID.forName(TaskAttemptID.java:201)
>
>         at
> com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:59)
>
>         at
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> Am I missing something here?
>
>
>
> Thanks,
>
> Siddharth
>
>
>
> *From:* Matthias Pohl <ma...@ververica.com>
> *Sent:* Monday, September 20, 2021 4:54 AM
> *To:* Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
> *Cc:* user@flink.apache.org; Hailu, Andreas [Engineering] <
> Andreas.Hailu@ny.email.gs.com>
> *Subject:* Re: hdfs lease issues on flink retry
>
>
>
> I don't know of any side effects of your approach. But another workaround
> I saw was replacing the _0 suffix by something like "_" +
> System.currentMillis()
>
>
>
> On Fri, Sep 17, 2021 at 8:38 PM Shah, Siddharth <Si...@gs.com>
> wrote:
>
> Hi Matthias,
>
>
>
> Thanks for looking into the issue and creating a ticket. I am thinking of
> having a workaround until the issue is fixed.
>
>
>
> What if I create the attempt directories with a random int by patching
> *HadoopOutputFormatBase*’s open() method?
>
>
>
> Original:
>
>
>
> TaskAttemptID taskAttemptID = TaskAttemptID.*forName*(
> *"attempt__0000_r_"       *+ String.*format*(*"%" *+ (6 - Integer.
> *toString*(taskNumber + 1).length()) + *"s"*, *" "*).replace(*" "*, *"0"*)
>       + Integer.*toString*(taskNumber + 1)
>       + *"_0"*);
>
>
>
>
>
> Patched:
>
>
>
> *int *attemptRandomPrefix = *new *Random().nextInt(999);
>
> TaskAttemptID taskAttemptID = TaskAttemptID.*forName*(
> *"attempt__"       *+ String.*format*(*"%" *+ (4 - Integer.*toString*(attemptRandomPrefix).length())
> + *"s"*, *" "*).replace(*" "*, *"0"*)
>       + Integer.*toString*(attemptRandomPrefix) +
> *"_r_"       *+ String.*format*(*"%" *+ (6 - Integer.*toString*(taskNumber
> + 1).length()) + *"s"*, *" "*).replace(*" "*, *"0"*)
>       + Integer.*toString*(taskNumber + 1)
>       + *"_0"*);
>
>
>
>
>
> So basically I am creating a directory named *attempt__0123_r_0001_0 *instead
> of *attempt__0000_r_0001_0*. I have tested on a handful of our jobs and
> seems to  be working fine. Just wanted to check any downside of this
> changes that I may not be aware of?
>
>
>
> Thanks,
>
> Siddharth
>
>
>
>
>
>
>
> *From:* Matthias Pohl <ma...@ververica.com>
> *Sent:* Tuesday, September 07, 2021 5:06 AM
> *To:* Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
> *Cc:* user@flink.apache.org; Hailu, Andreas [Engineering] <
> Andreas.Hailu@ny.email.gs.com>
> *Subject:* Re: hdfs lease issues on flink retry
>
>
>
> Just for documentation purposes: I created FLINK-24147 [1] to cover this
> issue.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-24147
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D24147&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=LgUitz7kzpyweO3xqm7f19qxwbHh_LbQ-_M1zOxutpM&e=>
>
>
>
> On Thu, Aug 26, 2021 at 6:14 PM Matthias Pohl <ma...@ververica.com>
> wrote:
>
> I see - I should have checked my mailbox before answering. I received the
> email and was able to login.
>
>
>
> On Thu, Aug 26, 2021 at 6:12 PM Matthias Pohl <ma...@ververica.com>
> wrote:
>
> The link doesn't work, i.e. I'm redirected to a login page. It would be
> also good to include the Flink logs and make them accessible for everyone.
> This way others could share their perspective as well...
>
>
>
> On Thu, Aug 26, 2021 at 5:40 PM Shah, Siddharth [Engineering] <
> Siddharth.x.Shah@gs.com> wrote:
>
> Hi Matthias,
>
>
>
> Thank you for responding and taking time to look at the issue.
>
>
>
> Uploaded the yarn lags here:
> https://lockbox.gs.com/lockbox/folders/963b0f29-85ad-4580-b420-8c66d9c07a84/
> and have also requested read permissions for you. Please let us know if
> you’re not able to see the files.
>
>
>
>
>
> *From:* Matthias Pohl <ma...@ververica.com>
> *Sent:* Thursday, August 26, 2021 9:47 AM
> *To:* Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
> *Cc:* user@flink.apache.org; Hailu, Andreas [Engineering] <
> Andreas.Hailu@ny.email.gs.com>
> *Subject:* Re: hdfs lease issues on flink retry
>
>
>
> Hi Siddharth,
>
> thanks for reaching out to the community. This might be a bug. Could you
> share your Flink and YARN logs? This way we could get a better
> understanding of what's going on.
>
>
>
> Best,
> Matthias
>
>
>
> On Tue, Aug 24, 2021 at 10:19 PM Shah, Siddharth [Engineering] <
> Siddharth.x.Shah@gs.com> wrote:
>
> Hi  Team,
>
>
>
> We are seeing transient failures in the jobs mostly requiring higher
> resources and using flink RestartStrategies
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>
> [1]. Upon checking the yarn logs we have observed hdfs lease issues when
> flink retry happens. The job originally fails for the first try with PartitionNotFoundException
> or NoResourceAvailableException., but on retry it seems form the yarn logs
> is that the lease for the temp sink directory is not yet released by the
> node from previous try.
>
>
>
> Initial Failure log message:
>
>
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate enough slots to run the job. Please make sure that the
> cluster has enough resources.
>
>         at
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:461)
>
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>
>         at
> org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:190)
>
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>
>
>
>
>
> Retry failure log message:
>
>
>
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): /user/p2epda/lake/delp_prod/PROD/APPROVED/data/TECHRISK_SENTINEL/INFORMATION_REPORT/4377/temp/data/_temporary/0/_temporary/attempt__0000_r_000003_0/partMapper-r-00003.snappy.parquet for client 10.51.63.226 already exists
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2815)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)
>
>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
>
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
>
>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>
>
>
>
>
>
>
> I could verify that it’s the same nodes from previous try owning the
> lease, and checked for multiple jobs by matching IP addresses. Ideally, we
> want an internal retry to happen since there will be thousands of jobs
> running at a time and hard to manually retry them.
>
>
>
> This is our current restart config:
>
> executionEnv.setRestartStrategy(RestartStrategies.*fixedDelayRestart*(3,
> Time.*of*(10, TimeUnit.*SECONDS*)));
>
>
>
> Is it possible to resolve leases before a retry? Or is it possible to have
> different sink directories (increment attempt id somewhere) for every
> retry, that way we have no lease issues? Or do you have any other
> suggestion on resolving this?
>
>
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#fixed-delay-restart-strategy
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>
>
>
>
>
>
> Thanks,
>
> Siddharth
>
>
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
>
>
>
>
> --
>
> *Matthias Pohl* | Engineer
>
>
>
> Follow us @VervericaData Ververica
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.ververica.com_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=yD5mFS5pqguoGU4aRj0pSWC55EeAGjSn-GOfK26ZYk8&e=>
>
> --
>
> Join Flink Forward
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__flink-2Dforward.org_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=E2wLfyIeTdz7eBxapl_pWf3hmDuxjkVK8N0xMc4o0PE&e=>
> - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

RE: hdfs lease issues on flink retry

Posted by "Shah, Siddharth" <Si...@gs.com>.
Hi David/Matthias,

Just checking again on my previous question.

From: Shah, Siddharth [Engineering]
Sent: Thursday, September 23, 2021 9:46 AM
To: 'David Morávek' <dm...@apache.org>; Matthias Pohl <ma...@ververica.com>
Cc: Hailu, Andreas [Engineering] <An...@ny.email.gs.com>; user@flink.apache.org; Erai, Rahul [Engineering] <Ra...@ny.email.gs.com>; Chittajallu, Rajiv [Engineering] <Ra...@ny.email.gs.com>
Subject: RE: hdfs lease issues on flink retry

Hi David/Matthias,

Thank you for your suggestion, it seems to be working fine. Had a quick question – would these _temporary directories created by DataSink task on retry require clean up or flink internally would take care of clean up part?

From: David Morávek <dm...@apache.org>>
Sent: Monday, September 20, 2021 5:12 PM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>>
Cc: Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>; Matthias Pohl <ma...@ververica.com>>; user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: hdfs lease issues on flink retry

Hi, AttemptId needs to be an integer (take a look at TaskAttemptID class for more details). As for your prior question, any random id should solve the issue.

Best,
D.

On Mon 20. 9. 2021 at 22:32, Shah, Siddharth <Si...@gs.com>> wrote:
Hi Matthias,

Thanks for suggesting a workaround, but our jobs fail with below error when I apply that change

Caused by: java.lang.IllegalArgumentException: TaskAttemptId string : attempt__0000_r_000001_1632168799349 is not properly formed
        at org.apache.hadoop.mapreduce.TaskAttemptID.forName(TaskAttemptID.java:201)
        at com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:59)
        at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:745)


Am I missing something here?

Thanks,
Siddharth

From: Matthias Pohl <ma...@ververica.com>>
Sent: Monday, September 20, 2021 4:54 AM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>
Subject: Re: hdfs lease issues on flink retry

I don't know of any side effects of your approach. But another workaround I saw was replacing the _0 suffix by something like "_" + System.currentMillis()

On Fri, Sep 17, 2021 at 8:38 PM Shah, Siddharth <Si...@gs.com>> wrote:
Hi Matthias,

Thanks for looking into the issue and creating a ticket. I am thinking of having a workaround until the issue is fixed.

What if I create the attempt directories with a random int by patching HadoopOutputFormatBase’s open() method?

Original:

TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
      + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(taskNumber + 1)
      + "_0");


Patched:

int attemptRandomPrefix = new Random().nextInt(999);

TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__"
      + String.format("%" + (4 - Integer.toString(attemptRandomPrefix).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(attemptRandomPrefix) + "_r_"
      + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(taskNumber + 1)
      + "_0");


So basically I am creating a directory named attempt__0123_r_0001_0 instead of attempt__0000_r_0001_0. I have tested on a handful of our jobs and seems to  be working fine. Just wanted to check any downside of this changes that I may not be aware of?

Thanks,
Siddharth



From: Matthias Pohl <ma...@ververica.com>>
Sent: Tuesday, September 07, 2021 5:06 AM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>
Subject: Re: hdfs lease issues on flink retry

Just for documentation purposes: I created FLINK-24147 [1] to cover this issue.

[1] https://issues.apache.org/jira/browse/FLINK-24147<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D24147&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=LgUitz7kzpyweO3xqm7f19qxwbHh_LbQ-_M1zOxutpM&e=>

On Thu, Aug 26, 2021 at 6:14 PM Matthias Pohl <ma...@ververica.com>> wrote:
I see - I should have checked my mailbox before answering. I received the email and was able to login.

On Thu, Aug 26, 2021 at 6:12 PM Matthias Pohl <ma...@ververica.com>> wrote:
The link doesn't work, i.e. I'm redirected to a login page. It would be also good to include the Flink logs and make them accessible for everyone. This way others could share their perspective as well...

On Thu, Aug 26, 2021 at 5:40 PM Shah, Siddharth [Engineering] <Si...@gs.com>> wrote:
Hi Matthias,

Thank you for responding and taking time to look at the issue.

Uploaded the yarn lags here: https://lockbox.gs.com/lockbox/folders/963b0f29-85ad-4580-b420-8c66d9c07a84/ and have also requested read permissions for you. Please let us know if you’re not able to see the files.


From: Matthias Pohl <ma...@ververica.com>>
Sent: Thursday, August 26, 2021 9:47 AM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>
Subject: Re: hdfs lease issues on flink retry

Hi Siddharth,
thanks for reaching out to the community. This might be a bug. Could you share your Flink and YARN logs? This way we could get a better understanding of what's going on.

Best,
Matthias

On Tue, Aug 24, 2021 at 10:19 PM Shah, Siddharth [Engineering] <Si...@gs.com>> wrote:
Hi  Team,

We are seeing transient failures in the jobs mostly requiring higher resources and using flink RestartStrategies<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=> [1]. Upon checking the yarn logs we have observed hdfs lease issues when flink retry happens. The job originally fails for the first try with PartitionNotFoundException or NoResourceAvailableException., but on retry it seems form the yarn logs is that the lease for the temp sink directory is not yet released by the node from previous try.

Initial Failure log message:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate enough slots to run the job. Please make sure that the cluster has enough resources.
        at org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:461)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:190)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)


Retry failure log message:


Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): /user/p2epda/lake/delp_prod/PROD/APPROVED/data/TECHRISK_SENTINEL/INFORMATION_REPORT/4377/temp/data/_temporary/0/_temporary/attempt__0000_r_000003_0/partMapper-r-00003.snappy.parquet for client 10.51.63.226 already exists

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2815)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)



I could verify that it’s the same nodes from previous try owning the lease, and checked for multiple jobs by matching IP addresses. Ideally, we want an internal retry to happen since there will be thousands of jobs running at a time and hard to manually retry them.

This is our current restart config:
executionEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

Is it possible to resolve leases before a retry? Or is it possible to have different sink directories (increment attempt id somewhere) for every retry, that way we have no lease issues? Or do you have any other suggestion on resolving this?


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#fixed-delay-restart-strategy<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>


Thanks,
Siddharth


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>



--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.ververica.com_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=yD5mFS5pqguoGU4aRj0pSWC55EeAGjSn-GOfK26ZYk8&e=>

--

Join Flink Forward<https://urldefense.proofpoint.com/v2/url?u=https-3A__flink-2Dforward.org_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=E2wLfyIeTdz7eBxapl_pWf3hmDuxjkVK8N0xMc4o0PE&e=> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re: hdfs lease issues on flink retry

Posted by David Morávek <dm...@apache.org>.
Hi, AttemptId needs to be an integer (take a look at TaskAttemptID class
for more details). As for your prior question, any random id should solve
the issue.

Best,
D.

On Mon 20. 9. 2021 at 22:32, Shah, Siddharth <Si...@gs.com>
wrote:

> Hi Matthias,
>
>
>
> Thanks for suggesting a workaround, but our jobs fail with below error
> when I apply that change
>
>
>
> *Caused by: java.lang.IllegalArgumentException: TaskAttemptId string :
> attempt__0000_r_000001_1632168799349 is not properly formed*
>
>         at
> org.apache.hadoop.mapreduce.TaskAttemptID.forName(TaskAttemptID.java:201)
>
>         at
> com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:59)
>
>         at
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>
>         at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> Am I missing something here?
>
>
>
> Thanks,
>
> Siddharth
>
>
>
> *From:* Matthias Pohl <ma...@ververica.com>
> *Sent:* Monday, September 20, 2021 4:54 AM
> *To:* Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
> *Cc:* user@flink.apache.org; Hailu, Andreas [Engineering] <
> Andreas.Hailu@ny.email.gs.com>
> *Subject:* Re: hdfs lease issues on flink retry
>
>
>
> I don't know of any side effects of your approach. But another workaround
> I saw was replacing the _0 suffix by something like "_" +
> System.currentMillis()
>
>
>
> On Fri, Sep 17, 2021 at 8:38 PM Shah, Siddharth <Si...@gs.com>
> wrote:
>
> Hi Matthias,
>
>
>
> Thanks for looking into the issue and creating a ticket. I am thinking of
> having a workaround until the issue is fixed.
>
>
>
> What if I create the attempt directories with a random int by patching
> *HadoopOutputFormatBase*’s open() method?
>
>
>
> Original:
>
>
>
> TaskAttemptID taskAttemptID = TaskAttemptID.*forName*(
> *"attempt__0000_r_"       *+ String.*format*(*"%" *+ (6 - Integer.
> *toString*(taskNumber + 1).length()) + *"s"*, *" "*).replace(*" "*, *"0"*)
>       + Integer.*toString*(taskNumber + 1)
>       + *"_0"*);
>
>
>
>
>
> Patched:
>
>
>
> *int *attemptRandomPrefix = *new *Random().nextInt(999);
>
> TaskAttemptID taskAttemptID = TaskAttemptID.*forName*(
> *"attempt__"       *+ String.*format*(*"%" *+ (4 - Integer.*toString*(attemptRandomPrefix).length())
> + *"s"*, *" "*).replace(*" "*, *"0"*)
>       + Integer.*toString*(attemptRandomPrefix) +
> *"_r_"       *+ String.*format*(*"%" *+ (6 - Integer.*toString*(taskNumber
> + 1).length()) + *"s"*, *" "*).replace(*" "*, *"0"*)
>       + Integer.*toString*(taskNumber + 1)
>       + *"_0"*);
>
>
>
>
>
> So basically I am creating a directory named *attempt__0123_r_0001_0 *instead
> of *attempt__0000_r_0001_0*. I have tested on a handful of our jobs and
> seems to  be working fine. Just wanted to check any downside of this
> changes that I may not be aware of?
>
>
>
> Thanks,
>
> Siddharth
>
>
>
>
>
>
>
> *From:* Matthias Pohl <ma...@ververica.com>
> *Sent:* Tuesday, September 07, 2021 5:06 AM
> *To:* Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
> *Cc:* user@flink.apache.org; Hailu, Andreas [Engineering] <
> Andreas.Hailu@ny.email.gs.com>
> *Subject:* Re: hdfs lease issues on flink retry
>
>
>
> Just for documentation purposes: I created FLINK-24147 [1] to cover this
> issue.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-24147
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D24147&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=LgUitz7kzpyweO3xqm7f19qxwbHh_LbQ-_M1zOxutpM&e=>
>
>
>
> On Thu, Aug 26, 2021 at 6:14 PM Matthias Pohl <ma...@ververica.com>
> wrote:
>
> I see - I should have checked my mailbox before answering. I received the
> email and was able to login.
>
>
>
> On Thu, Aug 26, 2021 at 6:12 PM Matthias Pohl <ma...@ververica.com>
> wrote:
>
> The link doesn't work, i.e. I'm redirected to a login page. It would be
> also good to include the Flink logs and make them accessible for everyone.
> This way others could share their perspective as well...
>
>
>
> On Thu, Aug 26, 2021 at 5:40 PM Shah, Siddharth [Engineering] <
> Siddharth.x.Shah@gs.com> wrote:
>
> Hi Matthias,
>
>
>
> Thank you for responding and taking time to look at the issue.
>
>
>
> Uploaded the yarn lags here:
> https://lockbox.gs.com/lockbox/folders/963b0f29-85ad-4580-b420-8c66d9c07a84/
> and have also requested read permissions for you. Please let us know if
> you’re not able to see the files.
>
>
>
>
>
> *From:* Matthias Pohl <ma...@ververica.com>
> *Sent:* Thursday, August 26, 2021 9:47 AM
> *To:* Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
> *Cc:* user@flink.apache.org; Hailu, Andreas [Engineering] <
> Andreas.Hailu@ny.email.gs.com>
> *Subject:* Re: hdfs lease issues on flink retry
>
>
>
> Hi Siddharth,
>
> thanks for reaching out to the community. This might be a bug. Could you
> share your Flink and YARN logs? This way we could get a better
> understanding of what's going on.
>
>
>
> Best,
> Matthias
>
>
>
> On Tue, Aug 24, 2021 at 10:19 PM Shah, Siddharth [Engineering] <
> Siddharth.x.Shah@gs.com> wrote:
>
> Hi  Team,
>
>
>
> We are seeing transient failures in the jobs mostly requiring higher
> resources and using flink RestartStrategies
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>
> [1]. Upon checking the yarn logs we have observed hdfs lease issues when
> flink retry happens. The job originally fails for the first try with PartitionNotFoundException
> or NoResourceAvailableException., but on retry it seems form the yarn logs
> is that the lease for the temp sink directory is not yet released by the
> node from previous try.
>
>
>
> Initial Failure log message:
>
>
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate enough slots to run the job. Please make sure that the
> cluster has enough resources.
>
>         at
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:461)
>
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>
>         at
> org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:190)
>
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>
>
>
>
>
> Retry failure log message:
>
>
>
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): /user/p2epda/lake/delp_prod/PROD/APPROVED/data/TECHRISK_SENTINEL/INFORMATION_REPORT/4377/temp/data/_temporary/0/_temporary/attempt__0000_r_000003_0/partMapper-r-00003.snappy.parquet for client 10.51.63.226 already exists
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2815)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)
>
>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
>
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
>
>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>
>
>
>
>
>
>
> I could verify that it’s the same nodes from previous try owning the
> lease, and checked for multiple jobs by matching IP addresses. Ideally, we
> want an internal retry to happen since there will be thousands of jobs
> running at a time and hard to manually retry them.
>
>
>
> This is our current restart config:
>
> executionEnv.setRestartStrategy(RestartStrategies.*fixedDelayRestart*(3,
> Time.*of*(10, TimeUnit.*SECONDS*)));
>
>
>
> Is it possible to resolve leases before a retry? Or is it possible to have
> different sink directories (increment attempt id somewhere) for every
> retry, that way we have no lease issues? Or do you have any other
> suggestion on resolving this?
>
>
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#fixed-delay-restart-strategy
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>
>
>
>
>
>
> Thanks,
>
> Siddharth
>
>
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
>
>
>
>
> --
>
> *Matthias Pohl* | Engineer
>
>
>
> Follow us @VervericaData Ververica
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.ververica.com_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=yD5mFS5pqguoGU4aRj0pSWC55EeAGjSn-GOfK26ZYk8&e=>
>
> --
>
> Join Flink Forward
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__flink-2Dforward.org_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=E2wLfyIeTdz7eBxapl_pWf3hmDuxjkVK8N0xMc4o0PE&e=>
> - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

RE: hdfs lease issues on flink retry

Posted by "Shah, Siddharth" <Si...@gs.com>.
Hi Matthias,

Thanks for suggesting a workaround, but our jobs fail with below error when I apply that change

Caused by: java.lang.IllegalArgumentException: TaskAttemptId string : attempt__0000_r_000001_1632168799349 is not properly formed
        at org.apache.hadoop.mapreduce.TaskAttemptID.forName(TaskAttemptID.java:201)
        at com.gs.ep.lake.flinkbasics.GRHadoopOutputFormat.open(GRHadoopOutputFormat.java:59)
        at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:745)


Am I missing something here?

Thanks,
Siddharth

From: Matthias Pohl <ma...@ververica.com>
Sent: Monday, September 20, 2021 4:54 AM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
Cc: user@flink.apache.org; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>
Subject: Re: hdfs lease issues on flink retry

I don't know of any side effects of your approach. But another workaround I saw was replacing the _0 suffix by something like "_" + System.currentMillis()

On Fri, Sep 17, 2021 at 8:38 PM Shah, Siddharth <Si...@gs.com>> wrote:
Hi Matthias,

Thanks for looking into the issue and creating a ticket. I am thinking of having a workaround until the issue is fixed.

What if I create the attempt directories with a random int by patching HadoopOutputFormatBase’s open() method?

Original:

TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
      + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(taskNumber + 1)
      + "_0");


Patched:

int attemptRandomPrefix = new Random().nextInt(999);

TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__"
      + String.format("%" + (4 - Integer.toString(attemptRandomPrefix).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(attemptRandomPrefix) + "_r_"
      + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(taskNumber + 1)
      + "_0");


So basically I am creating a directory named attempt__0123_r_0001_0 instead of attempt__0000_r_0001_0. I have tested on a handful of our jobs and seems to  be working fine. Just wanted to check any downside of this changes that I may not be aware of?

Thanks,
Siddharth



From: Matthias Pohl <ma...@ververica.com>>
Sent: Tuesday, September 07, 2021 5:06 AM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>
Subject: Re: hdfs lease issues on flink retry

Just for documentation purposes: I created FLINK-24147 [1] to cover this issue.

[1] https://issues.apache.org/jira/browse/FLINK-24147<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D24147&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=LgUitz7kzpyweO3xqm7f19qxwbHh_LbQ-_M1zOxutpM&e=>

On Thu, Aug 26, 2021 at 6:14 PM Matthias Pohl <ma...@ververica.com>> wrote:
I see - I should have checked my mailbox before answering. I received the email and was able to login.

On Thu, Aug 26, 2021 at 6:12 PM Matthias Pohl <ma...@ververica.com>> wrote:
The link doesn't work, i.e. I'm redirected to a login page. It would be also good to include the Flink logs and make them accessible for everyone. This way others could share their perspective as well...

On Thu, Aug 26, 2021 at 5:40 PM Shah, Siddharth [Engineering] <Si...@gs.com>> wrote:
Hi Matthias,

Thank you for responding and taking time to look at the issue.

Uploaded the yarn lags here: https://lockbox.gs.com/lockbox/folders/963b0f29-85ad-4580-b420-8c66d9c07a84/ and have also requested read permissions for you. Please let us know if you’re not able to see the files.


From: Matthias Pohl <ma...@ververica.com>>
Sent: Thursday, August 26, 2021 9:47 AM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>
Subject: Re: hdfs lease issues on flink retry

Hi Siddharth,
thanks for reaching out to the community. This might be a bug. Could you share your Flink and YARN logs? This way we could get a better understanding of what's going on.

Best,
Matthias

On Tue, Aug 24, 2021 at 10:19 PM Shah, Siddharth [Engineering] <Si...@gs.com>> wrote:
Hi  Team,

We are seeing transient failures in the jobs mostly requiring higher resources and using flink RestartStrategies<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=> [1]. Upon checking the yarn logs we have observed hdfs lease issues when flink retry happens. The job originally fails for the first try with PartitionNotFoundException or NoResourceAvailableException., but on retry it seems form the yarn logs is that the lease for the temp sink directory is not yet released by the node from previous try.

Initial Failure log message:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate enough slots to run the job. Please make sure that the cluster has enough resources.
        at org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:461)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:190)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)


Retry failure log message:


Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): /user/p2epda/lake/delp_prod/PROD/APPROVED/data/TECHRISK_SENTINEL/INFORMATION_REPORT/4377/temp/data/_temporary/0/_temporary/attempt__0000_r_000003_0/partMapper-r-00003.snappy.parquet for client 10.51.63.226 already exists

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2815)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)



I could verify that it’s the same nodes from previous try owning the lease, and checked for multiple jobs by matching IP addresses. Ideally, we want an internal retry to happen since there will be thousands of jobs running at a time and hard to manually retry them.

This is our current restart config:
executionEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

Is it possible to resolve leases before a retry? Or is it possible to have different sink directories (increment attempt id somewhere) for every retry, that way we have no lease issues? Or do you have any other suggestion on resolving this?


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#fixed-delay-restart-strategy<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>


Thanks,
Siddharth


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>



--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.ververica.com_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=yD5mFS5pqguoGU4aRj0pSWC55EeAGjSn-GOfK26ZYk8&e=>

--

Join Flink Forward<https://urldefense.proofpoint.com/v2/url?u=https-3A__flink-2Dforward.org_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=E2wLfyIeTdz7eBxapl_pWf3hmDuxjkVK8N0xMc4o0PE&e=> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re: hdfs lease issues on flink retry

Posted by Matthias Pohl <ma...@ververica.com>.
I don't know of any side effects of your approach. But another workaround I
saw was replacing the _0 suffix by something like "_" +
System.currentMillis()

On Fri, Sep 17, 2021 at 8:38 PM Shah, Siddharth <Si...@gs.com>
wrote:

> Hi Matthias,
>
>
>
> Thanks for looking into the issue and creating a ticket. I am thinking of
> having a workaround until the issue is fixed.
>
>
>
> What if I create the attempt directories with a random int by patching
> *HadoopOutputFormatBase*’s open() method?
>
>
>
> Original:
>
>
>
> TaskAttemptID taskAttemptID = TaskAttemptID.*forName*(
> *"attempt__0000_r_"       *+ String.*format*(*"%" *+ (6 - Integer.
> *toString*(taskNumber + 1).length()) + *"s"*, *" "*).replace(*" "*, *"0"*)
>       + Integer.*toString*(taskNumber + 1)
>       + *"_0"*);
>
>
>
>
>
> Patched:
>
>
>
> *int *attemptRandomPrefix = *new *Random().nextInt(999);
>
> TaskAttemptID taskAttemptID = TaskAttemptID.*forName*(
> *"attempt__"       *+ String.*format*(*"%" *+ (4 - Integer.*toString*(attemptRandomPrefix).length())
> + *"s"*, *" "*).replace(*" "*, *"0"*)
>       + Integer.*toString*(attemptRandomPrefix) +
> *"_r_"       *+ String.*format*(*"%" *+ (6 - Integer.*toString*(taskNumber
> + 1).length()) + *"s"*, *" "*).replace(*" "*, *"0"*)
>       + Integer.*toString*(taskNumber + 1)
>       + *"_0"*);
>
>
>
>
>
> So basically I am creating a directory named *attempt__0123_r_0001_0 *instead
> of *attempt__0000_r_0001_0*. I have tested on a handful of our jobs and
> seems to  be working fine. Just wanted to check any downside of this
> changes that I may not be aware of?
>
>
>
> Thanks,
>
> Siddharth
>
>
>
>
>
>
>
> *From:* Matthias Pohl <ma...@ververica.com>
> *Sent:* Tuesday, September 07, 2021 5:06 AM
> *To:* Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
> *Cc:* user@flink.apache.org; Hailu, Andreas [Engineering] <
> Andreas.Hailu@ny.email.gs.com>
> *Subject:* Re: hdfs lease issues on flink retry
>
>
>
> Just for documentation purposes: I created FLINK-24147 [1] to cover this
> issue.
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-24147
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D24147&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=LgUitz7kzpyweO3xqm7f19qxwbHh_LbQ-_M1zOxutpM&e=>
>
>
>
> On Thu, Aug 26, 2021 at 6:14 PM Matthias Pohl <ma...@ververica.com>
> wrote:
>
> I see - I should have checked my mailbox before answering. I received the
> email and was able to login.
>
>
>
> On Thu, Aug 26, 2021 at 6:12 PM Matthias Pohl <ma...@ververica.com>
> wrote:
>
> The link doesn't work, i.e. I'm redirected to a login page. It would be
> also good to include the Flink logs and make them accessible for everyone.
> This way others could share their perspective as well...
>
>
>
> On Thu, Aug 26, 2021 at 5:40 PM Shah, Siddharth [Engineering] <
> Siddharth.x.Shah@gs.com> wrote:
>
> Hi Matthias,
>
>
>
> Thank you for responding and taking time to look at the issue.
>
>
>
> Uploaded the yarn lags here:
> https://lockbox.gs.com/lockbox/folders/963b0f29-85ad-4580-b420-8c66d9c07a84/
> and have also requested read permissions for you. Please let us know if
> you’re not able to see the files.
>
>
>
>
>
> *From:* Matthias Pohl <ma...@ververica.com>
> *Sent:* Thursday, August 26, 2021 9:47 AM
> *To:* Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
> *Cc:* user@flink.apache.org; Hailu, Andreas [Engineering] <
> Andreas.Hailu@ny.email.gs.com>
> *Subject:* Re: hdfs lease issues on flink retry
>
>
>
> Hi Siddharth,
>
> thanks for reaching out to the community. This might be a bug. Could you
> share your Flink and YARN logs? This way we could get a better
> understanding of what's going on.
>
>
>
> Best,
> Matthias
>
>
>
> On Tue, Aug 24, 2021 at 10:19 PM Shah, Siddharth [Engineering] <
> Siddharth.x.Shah@gs.com> wrote:
>
> Hi  Team,
>
>
>
> We are seeing transient failures in the jobs mostly requiring higher
> resources and using flink RestartStrategies
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>
> [1]. Upon checking the yarn logs we have observed hdfs lease issues when
> flink retry happens. The job originally fails for the first try with PartitionNotFoundException
> or NoResourceAvailableException., but on retry it seems form the yarn logs
> is that the lease for the temp sink directory is not yet released by the
> node from previous try.
>
>
>
> Initial Failure log message:
>
>
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate enough slots to run the job. Please make sure that the
> cluster has enough resources.
>
>         at
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:461)
>
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>
>         at
> org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:190)
>
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>
>
>
>
>
> Retry failure log message:
>
>
>
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): /user/p2epda/lake/delp_prod/PROD/APPROVED/data/TECHRISK_SENTINEL/INFORMATION_REPORT/4377/temp/data/_temporary/0/_temporary/attempt__0000_r_000003_0/partMapper-r-00003.snappy.parquet for client 10.51.63.226 already exists
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2815)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)
>
>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)
>
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)
>
>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>
>
>
>
>
>
>
> I could verify that it’s the same nodes from previous try owning the
> lease, and checked for multiple jobs by matching IP addresses. Ideally, we
> want an internal retry to happen since there will be thousands of jobs
> running at a time and hard to manually retry them.
>
>
>
> This is our current restart config:
>
> executionEnv.setRestartStrategy(RestartStrategies.*fixedDelayRestart*(3,
> Time.*of*(10, TimeUnit.*SECONDS*)));
>
>
>
> Is it possible to resolve leases before a retry? Or is it possible to have
> different sink directories (increment attempt id somewhere) for every
> retry, that way we have no lease issues? Or do you have any other
> suggestion on resolving this?
>
>
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#fixed-delay-restart-strategy
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>
>
>
>
>
>
> Thanks,
>
> Siddharth
>
>
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
>
>
>
>
> --
>
> *Matthias Pohl* | Engineer
>
>
>
> Follow us @VervericaData Ververica
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.ververica.com_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=yD5mFS5pqguoGU4aRj0pSWC55EeAGjSn-GOfK26ZYk8&e=>
>
> --
>
> Join Flink Forward
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__flink-2Dforward.org_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=E2wLfyIeTdz7eBxapl_pWf3hmDuxjkVK8N0xMc4o0PE&e=>
> - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

RE: hdfs lease issues on flink retry

Posted by "Shah, Siddharth" <Si...@gs.com>.
Hi Matthias,

Thanks for looking into the issue and creating a ticket. I am thinking of having a workaround until the issue is fixed.

What if I create the attempt directories with a random int by patching HadoopOutputFormatBase’s open() method?

Original:

TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
      + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(taskNumber + 1)
      + "_0");


Patched:

int attemptRandomPrefix = new Random().nextInt(999);

TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__"
      + String.format("%" + (4 - Integer.toString(attemptRandomPrefix).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(attemptRandomPrefix) + "_r_"
      + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
      + Integer.toString(taskNumber + 1)
      + "_0");


So basically I am creating a directory named attempt__0123_r_0001_0 instead of attempt__0000_r_0001_0. I have tested on a handful of our jobs and seems to  be working fine. Just wanted to check any downside of this changes that I may not be aware of?

Thanks,
Siddharth



From: Matthias Pohl <ma...@ververica.com>
Sent: Tuesday, September 07, 2021 5:06 AM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>
Cc: user@flink.apache.org; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>
Subject: Re: hdfs lease issues on flink retry

Just for documentation purposes: I created FLINK-24147 [1] to cover this issue.

[1] https://issues.apache.org/jira/browse/FLINK-24147<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D24147&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=LgUitz7kzpyweO3xqm7f19qxwbHh_LbQ-_M1zOxutpM&e=>

On Thu, Aug 26, 2021 at 6:14 PM Matthias Pohl <ma...@ververica.com>> wrote:
I see - I should have checked my mailbox before answering. I received the email and was able to login.

On Thu, Aug 26, 2021 at 6:12 PM Matthias Pohl <ma...@ververica.com>> wrote:
The link doesn't work, i.e. I'm redirected to a login page. It would be also good to include the Flink logs and make them accessible for everyone. This way others could share their perspective as well...

On Thu, Aug 26, 2021 at 5:40 PM Shah, Siddharth [Engineering] <Si...@gs.com>> wrote:
Hi Matthias,

Thank you for responding and taking time to look at the issue.

Uploaded the yarn lags here: https://lockbox.gs.com/lockbox/folders/963b0f29-85ad-4580-b420-8c66d9c07a84/ and have also requested read permissions for you. Please let us know if you’re not able to see the files.


From: Matthias Pohl <ma...@ververica.com>>
Sent: Thursday, August 26, 2021 9:47 AM
To: Shah, Siddharth [Engineering] <Si...@ny.email.gs.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>; Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>
Subject: Re: hdfs lease issues on flink retry

Hi Siddharth,
thanks for reaching out to the community. This might be a bug. Could you share your Flink and YARN logs? This way we could get a better understanding of what's going on.

Best,
Matthias

On Tue, Aug 24, 2021 at 10:19 PM Shah, Siddharth [Engineering] <Si...@gs.com>> wrote:
Hi  Team,

We are seeing transient failures in the jobs mostly requiring higher resources and using flink RestartStrategies<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=> [1]. Upon checking the yarn logs we have observed hdfs lease issues when flink retry happens. The job originally fails for the first try with PartitionNotFoundException or NoResourceAvailableException., but on retry it seems form the yarn logs is that the lease for the temp sink directory is not yet released by the node from previous try.

Initial Failure log message:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate enough slots to run the job. Please make sure that the cluster has enough resources.
        at org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:461)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:190)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)


Retry failure log message:


Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): /user/p2epda/lake/delp_prod/PROD/APPROVED/data/TECHRISK_SENTINEL/INFORMATION_REPORT/4377/temp/data/_temporary/0/_temporary/attempt__0000_r_000003_0/partMapper-r-00003.snappy.parquet for client 10.51.63.226 already exists

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2815)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2702)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2586)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:736)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:409)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)



I could verify that it’s the same nodes from previous try owning the lease, and checked for multiple jobs by matching IP addresses. Ideally, we want an internal retry to happen since there will be thousands of jobs running at a time and hard to manually retry them.

This is our current restart config:
executionEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

Is it possible to resolve leases before a retry? Or is it possible to have different sink directories (increment attempt id somewhere) for every retry, that way we have no lease issues? Or do you have any other suggestion on resolving this?


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#fixed-delay-restart-strategy<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.13_docs_dev_execution_task-5Ffailure-5Frecovery_-23fixed-2Ddelay-2Drestart-2Dstrategy&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=qIClgDVq00Jp0qluJfWV-aGM7Sg7tAnr_I2yy4TtNaM&s=wL6-8B4mnGofyRWetXrTSw9FBSV-XTDnoHsPtzU7h7c&e=>


Thanks,
Siddharth


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>



--

Matthias Pohl | Engineer


Follow us @VervericaData Ververica<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.ververica.com_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=yD5mFS5pqguoGU4aRj0pSWC55EeAGjSn-GOfK26ZYk8&e=>

--

Join Flink Forward<https://urldefense.proofpoint.com/v2/url?u=https-3A__flink-2Dforward.org_&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=eLqB-T65EFJVVpR6QlSfRHIga7DPK3o8yJvw_OhnMvk&m=jTswrQDq0l9TalcFRAb297cz4EfsU-LznMJyB2uXDl0&s=E2wLfyIeTdz7eBxapl_pWf3hmDuxjkVK8N0xMc4o0PE&e=> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>