You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alex Vinnik <al...@gmail.com> on 2018/12/11 23:07:35 UTC

Re: Cannot configure akka.ask.timeout

Hi there,

Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .

akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher#202546747]] after [10000 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

akka.ask.timeout: 600s

But looks like it is not honored. Any suggestions what can be done.

Thanks

On 2018/07/13 10:24:16, Lukas Kircher <l....@gmail.com> wrote:
> Hello,>
>
> I have problems setting configuration parameters for Akka in Flink 1.5.0.
When I run a job I get the exception listed below which states that Akka
timed out after 10000ms. I tried to increase the timeout by following the
Flink configuration documentation. Specifically I did the following:>
>
> 1) Passed a configuration to the Flink execution environment with
`akka.ask.timeout` set to a higher value. I started this in Intellij.>
> 2) Passed program arguments via the run configuration in Intellij, e.g.
`-Dakka.ask.timeout:100s`>
> 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local
standalone cluster via start-cluster.sh. The setting is reflected in
Flink's web interface.>
>
> However - despite explicit configuration the default setting seems to be
used. The exception below states in each case that akka ask timed out after
10000ms.>
>
> As my problem seems very basic I do not include an SSCCE for now but I
can try to build one if this helps figuring out the issue.>
>
> ------>
> [...]>
> Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
JobResult.>
> [...]>
> at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>

> at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
> at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>

> at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>

> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
> [...]>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
after [10000 ms]. Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
> at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
> at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>

> at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>

> at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>

> at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>

> at
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>

> at
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>

> at
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>

> at java.lang.Thread.run(Thread.java:745)>
> [...]>
> ------>
>
>
> Best regards and thanks for your help,>
> Lukas>
>
>
>
>

Re: Cannot configure akka.ask.timeout

Posted by qi luo <lu...@gmail.com>.
Hi Alex,

I’m not very familiar with JsonLinesInputFormat, is that your own implementation? You may look into the `createInputSplits()` method which should do the listing work. You may rewrite it with concurrent listing.

> On Dec 13, 2018, at 11:56 PM, Alex Vinnik <al...@gmail.com> wrote:
> 
> Qi,
> 
> Thanks for references! How do enable concurrent s3 file listing? Here is the code.
> 
> // Consume the JSON files
> Configuration configuration = new Configuration(GlobalConfiguration.loadConfiguration());
> configuration.setBoolean(JsonLinesInputFormat.ENUMERATE_NESTED_FILES_FLAG, true);
> 
> JsonLinesInputFormat jsonInputFormat = new JsonLinesInputFormat(new Path(inputPath), configuration);
> jsonInputFormat.setFilesFilter(new BucketingSinkFilter());
> 
> DataSet<ObjectNode> input = env.readFile(jsonInputFormat, inputPath).withParameters(configuration);
> 
> On Wed, Dec 12, 2018 at 8:53 PM qi luo <luoqi.bd@gmail.com <ma...@gmail.com>> wrote:
> Hi Alex,
> 
> The hard code I’ve found is [1] and [2].
> 
> We encountered a similar issue like yours (listing a lot of HDFS files). We end up with a newer version of HDFSFileInput which lists files concurrently. Another hack we did is to list the files in client side and pass them to JobManager via serialization (not recommended though as it doesn’t follow Flink framework mechanism). 
> 
> You can also try listing S3 files concurrently, or paste your sample code here.
> 
> [1] https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L187 <https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L187>
> [2] https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117 <https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117>
> 
>> On Dec 13, 2018, at 1:09 AM, Alex Vinnik <alvinnik.g@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Qi,
>> 
>> Job submission timeout is caused by listing too many files in S3 during env.readFile call to create input DataSet. Is there a way NOT to list S3 files during a job submission? It seems like it should help to mitigate that timeout problem.
>> 
>> What hardcoded value you were referring to? 
>> 
>> Best,
>> -Alex
>> 
>> On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <alvinnik.g@gmail.com <ma...@gmail.com>> wrote:
>> Hi Qi,
>> 
>> Thanks for looking into this. Here is ticket https://issues.apache.org/jira/browse/FLINK-11143 <https://issues.apache.org/jira/browse/FLINK-11143>
>> 
>> Best,
>> -Alex
>> 
>> On Tue, Dec 11, 2018 at 8:47 PM qi luo <luoqi.bd@gmail.com <ma...@gmail.com>> wrote:
>> Hi Alex and Lukas,
>> 
>> This error is controlled by another RPC timeout (which is hard coded and not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose a fix on that?
>> 
>> Cheers,
>> Qi
>> 
>>> On Dec 12, 2018, at 7:07 AM, Alex Vinnik <alvinnik.g@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi there,
>>> 
>>> Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .
>>> 
>>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#202546747 <>]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>> 
>>> akka.ask.timeout: 600s
>>> 
>>> But looks like it is not honored. Any suggestions what can be done.
>>> 
>>> Thanks
>>> 
>>> On 2018/07/13 10:24:16, Lukas Kircher <l...@gmail.com <ma...@gmail.com>> wrote: 
>>> > Hello,> 
>>> > 
>>> > I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:> 
>>> > 
>>> > 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.> 
>>> > 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`> 
>>> > 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.> 
>>> > 
>>> > However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.> 
>>> > 
>>> > As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.> 
>>> > 
>>> > ------> 
>>> > [...]> 
>>> > Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.> 
>>> > [...]> 
>>> > at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)> 
>>> > at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)> 
>>> > at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)> 
>>> > at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)> 
>>> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)> 
>>> > [...]> 
>>> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583 <>]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".> 
>>> > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)> 
>>> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)> 
>>> > at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)> 
>>> > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)> 
>>> > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)> 
>>> > at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)> 
>>> > at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)> 
>>> > at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)> 
>>> > at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)> 
>>> > at java.lang.Thread.run(Thread.java:745)> 
>>> > [...]> 
>>> > ------> 
>>> > 
>>> > 
>>> > Best regards and thanks for your help,> 
>>> > Lukas> 
>>> > 
>>> > 
>>> > 
>>> >
>> 
> 


Re: Cannot configure akka.ask.timeout

Posted by Alex Vinnik <al...@gmail.com>.
Qi,

Thanks for references! How do enable concurrent s3 file listing? Here is
the code.

// Consume the JSON files
Configuration configuration = new
Configuration(GlobalConfiguration.loadConfiguration());
configuration.setBoolean(JsonLinesInputFormat.ENUMERATE_NESTED_FILES_FLAG,
true);

JsonLinesInputFormat jsonInputFormat = new JsonLinesInputFormat(new
Path(inputPath), configuration);
jsonInputFormat.setFilesFilter(new BucketingSinkFilter());

DataSet<ObjectNode> input = env.readFile(jsonInputFormat,
inputPath).withParameters(configuration);


On Wed, Dec 12, 2018 at 8:53 PM qi luo <lu...@gmail.com> wrote:

> Hi Alex,
>
> The hard code I’ve found is [1] and [2].
>
> We encountered a similar issue like yours (listing a lot of HDFS files).
> We end up with a newer version of HDFSFileInput which lists files
> concurrently. Another hack we did is to list the files in client side and
> pass them to JobManager via serialization (not recommended though as it
> doesn’t follow Flink framework mechanism).
>
> You can also try listing S3 files concurrently, or paste your sample code
> here.
>
> [1]
> https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L187
> [2]
> https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117
>
> On Dec 13, 2018, at 1:09 AM, Alex Vinnik <al...@gmail.com> wrote:
>
> Qi,
>
> Job submission timeout is caused by listing too many files in S3
> during env.readFile call to create input DataSet. Is there a way NOT to
> list S3 files during a job submission? It seems like it should help to
> mitigate that timeout problem.
>
> What hardcoded value you were referring to?
>
> Best,
> -Alex
>
> On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <al...@gmail.com> wrote:
>
>> Hi Qi,
>>
>> Thanks for looking into this. Here is ticket
>> https://issues.apache.org/jira/browse/FLINK-11143
>>
>> Best,
>> -Alex
>>
>> On Tue, Dec 11, 2018 at 8:47 PM qi luo <lu...@gmail.com> wrote:
>>
>>> Hi Alex and Lukas,
>>>
>>> This error is controlled by another RPC timeout (which is hard coded and
>>> not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can
>>> propose a fix on that?
>>>
>>> Cheers,
>>> Qi
>>>
>>> On Dec 12, 2018, at 7:07 AM, Alex Vinnik <al...@gmail.com> wrote:
>>>
>>> Hi there,
>>>
>>> Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .
>>>
>>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[
>>> akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null]
>>> sent message of type
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>
>>> akka.ask.timeout: 600s
>>>
>>> But looks like it is not honored. Any suggestions what can be done.
>>>
>>> Thanks
>>>
>>> On 2018/07/13 10:24:16, Lukas Kircher <l....@gmail.com> wrote:
>>> > Hello,>
>>> >
>>> > I have problems setting configuration parameters for Akka in Flink
>>> 1.5.0. When I run a job I get the exception listed below which states that
>>> Akka timed out after 10000ms. I tried to increase the timeout by following
>>> the Flink configuration documentation. Specifically I did the following:>
>>> >
>>> > 1) Passed a configuration to the Flink execution environment with
>>> `akka.ask.timeout` set to a higher value. I started this in Intellij.>
>>> > 2) Passed program arguments via the run configuration in Intellij,
>>> e.g. `-Dakka.ask.timeout:100s`>
>>> > 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a
>>> local standalone cluster via start-cluster.sh. The setting is reflected in
>>> Flink's web interface.>
>>> >
>>> > However - despite explicit configuration the default setting seems to
>>> be used. The exception below states in each case that akka ask timed out
>>> after 10000ms.>
>>> >
>>> > As my problem seems very basic I do not include an SSCCE for now but I
>>> can try to build one if this helps figuring out the issue.>
>>> >
>>> > ------>
>>> > [...]>
>>> > Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
>>> JobResult.>
>>> > [...]>
>>> > at
>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
>>>
>>> > at
>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
>>> > at
>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
>>>
>>> > at
>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
>>>
>>> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
>>> > [...]>
>>> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[
>>> akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
>>> after [10000 ms]. Sender[null] sent message of type
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
>>> > at
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
>>> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
>>> > at
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
>>>
>>> > at
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
>>>
>>> > at
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
>>>
>>> > at
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
>>>
>>> > at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
>>>
>>> > at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
>>>
>>> > at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
>>>
>>> > at java.lang.Thread.run(Thread.java:745)>
>>> > [...]>
>>> > ------>
>>> >
>>> >
>>> > Best regards and thanks for your help,>
>>> > Lukas>
>>> >
>>> >
>>> >
>>> >
>>>
>>>
>>>
>

Re: Cannot configure akka.ask.timeout

Posted by qi luo <lu...@gmail.com>.
Hi Alex,

The hard code I’ve found is [1] and [2].

We encountered a similar issue like yours (listing a lot of HDFS files). We end up with a newer version of HDFSFileInput which lists files concurrently. Another hack we did is to list the files in client side and pass them to JobManager via serialization (not recommended though as it doesn’t follow Flink framework mechanism). 

You can also try listing S3 files concurrently, or paste your sample code here.

[1] https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L187 <https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L187>
[2] https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117

> On Dec 13, 2018, at 1:09 AM, Alex Vinnik <al...@gmail.com> wrote:
> 
> Qi,
> 
> Job submission timeout is caused by listing too many files in S3 during env.readFile call to create input DataSet. Is there a way NOT to list S3 files during a job submission? It seems like it should help to mitigate that timeout problem.
> 
> What hardcoded value you were referring to? 
> 
> Best,
> -Alex
> 
> On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <alvinnik.g@gmail.com <ma...@gmail.com>> wrote:
> Hi Qi,
> 
> Thanks for looking into this. Here is ticket https://issues.apache.org/jira/browse/FLINK-11143 <https://issues.apache.org/jira/browse/FLINK-11143>
> 
> Best,
> -Alex
> 
> On Tue, Dec 11, 2018 at 8:47 PM qi luo <luoqi.bd@gmail.com <ma...@gmail.com>> wrote:
> Hi Alex and Lukas,
> 
> This error is controlled by another RPC timeout (which is hard coded and not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose a fix on that?
> 
> Cheers,
> Qi
> 
>> On Dec 12, 2018, at 7:07 AM, Alex Vinnik <alvinnik.g@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi there,
>> 
>> Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .
>> 
>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#202546747 <>]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>> 
>> akka.ask.timeout: 600s
>> 
>> But looks like it is not honored. Any suggestions what can be done.
>> 
>> Thanks
>> 
>> On 2018/07/13 10:24:16, Lukas Kircher <l...@gmail.com <ma...@gmail.com>> wrote: 
>> > Hello,> 
>> > 
>> > I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:> 
>> > 
>> > 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.> 
>> > 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`> 
>> > 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.> 
>> > 
>> > However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.> 
>> > 
>> > As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.> 
>> > 
>> > ------> 
>> > [...]> 
>> > Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.> 
>> > [...]> 
>> > at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)> 
>> > at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)> 
>> > at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)> 
>> > at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)> 
>> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)> 
>> > [...]> 
>> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583 <>]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".> 
>> > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)> 
>> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)> 
>> > at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)> 
>> > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)> 
>> > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)> 
>> > at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)> 
>> > at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)> 
>> > at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)> 
>> > at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)> 
>> > at java.lang.Thread.run(Thread.java:745)> 
>> > [...]> 
>> > ------> 
>> > 
>> > 
>> > Best regards and thanks for your help,> 
>> > Lukas> 
>> > 
>> > 
>> > 
>> >
> 


Re: Cannot configure akka.ask.timeout

Posted by Alex Vinnik <al...@gmail.com>.
Qi,

Job submission timeout is caused by listing too many files in S3
during env.readFile call to create input DataSet. Is there a way NOT to
list S3 files during a job submission? It seems like it should help to
mitigate that timeout problem.

What hardcoded value you were referring to?

Best,
-Alex

On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <al...@gmail.com> wrote:

> Hi Qi,
>
> Thanks for looking into this. Here is ticket
> https://issues.apache.org/jira/browse/FLINK-11143
>
> Best,
> -Alex
>
> On Tue, Dec 11, 2018 at 8:47 PM qi luo <lu...@gmail.com> wrote:
>
>> Hi Alex and Lukas,
>>
>> This error is controlled by another RPC timeout (which is hard coded and
>> not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can
>> propose a fix on that?
>>
>> Cheers,
>> Qi
>>
>> On Dec 12, 2018, at 7:07 AM, Alex Vinnik <al...@gmail.com> wrote:
>>
>> Hi there,
>>
>> Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .
>>
>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[
>> akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null]
>> sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>
>> akka.ask.timeout: 600s
>>
>> But looks like it is not honored. Any suggestions what can be done.
>>
>> Thanks
>>
>> On 2018/07/13 10:24:16, Lukas Kircher <l....@gmail.com> wrote:
>> > Hello,>
>> >
>> > I have problems setting configuration parameters for Akka in Flink
>> 1.5.0. When I run a job I get the exception listed below which states that
>> Akka timed out after 10000ms. I tried to increase the timeout by following
>> the Flink configuration documentation. Specifically I did the following:>
>> >
>> > 1) Passed a configuration to the Flink execution environment with
>> `akka.ask.timeout` set to a higher value. I started this in Intellij.>
>> > 2) Passed program arguments via the run configuration in Intellij, e.g.
>> `-Dakka.ask.timeout:100s`>
>> > 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a
>> local standalone cluster via start-cluster.sh. The setting is reflected in
>> Flink's web interface.>
>> >
>> > However - despite explicit configuration the default setting seems to
>> be used. The exception below states in each case that akka ask timed out
>> after 10000ms.>
>> >
>> > As my problem seems very basic I do not include an SSCCE for now but I
>> can try to build one if this helps figuring out the issue.>
>> >
>> > ------>
>> > [...]>
>> > Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
>> JobResult.>
>> > [...]>
>> > at
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
>>
>> > at
>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
>> > at
>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
>>
>> > at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
>>
>> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
>> > [...]>
>> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[
>> akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
>> after [10000 ms]. Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
>> > at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
>> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
>> > at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
>>
>> > at
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
>>
>> > at
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
>>
>> > at
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
>>
>> > at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
>>
>> > at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
>>
>> > at
>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
>>
>> > at java.lang.Thread.run(Thread.java:745)>
>> > [...]>
>> > ------>
>> >
>> >
>> > Best regards and thanks for your help,>
>> > Lukas>
>> >
>> >
>> >
>> >
>>
>>
>>

Re: Cannot configure akka.ask.timeout

Posted by Alex Vinnik <al...@gmail.com>.
Hi Qi,

Thanks for looking into this. Here is ticket
https://issues.apache.org/jira/browse/FLINK-11143

Best,
-Alex

On Tue, Dec 11, 2018 at 8:47 PM qi luo <lu...@gmail.com> wrote:

> Hi Alex and Lukas,
>
> This error is controlled by another RPC timeout (which is hard coded and
> not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can
> propose a fix on that?
>
> Cheers,
> Qi
>
> On Dec 12, 2018, at 7:07 AM, Alex Vinnik <al...@gmail.com> wrote:
>
> Hi there,
>
> Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .
>
> akka.pattern.AskTimeoutException: Ask timed out on [Actor[
> akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null]
> sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>
> akka.ask.timeout: 600s
>
> But looks like it is not honored. Any suggestions what can be done.
>
> Thanks
>
> On 2018/07/13 10:24:16, Lukas Kircher <l....@gmail.com> wrote:
> > Hello,>
> >
> > I have problems setting configuration parameters for Akka in Flink
> 1.5.0. When I run a job I get the exception listed below which states that
> Akka timed out after 10000ms. I tried to increase the timeout by following
> the Flink configuration documentation. Specifically I did the following:>
> >
> > 1) Passed a configuration to the Flink execution environment with
> `akka.ask.timeout` set to a higher value. I started this in Intellij.>
> > 2) Passed program arguments via the run configuration in Intellij, e.g.
> `-Dakka.ask.timeout:100s`>
> > 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a
> local standalone cluster via start-cluster.sh. The setting is reflected in
> Flink's web interface.>
> >
> > However - despite explicit configuration the default setting seems to be
> used. The exception below states in each case that akka ask timed out after
> 10000ms.>
> >
> > As my problem seems very basic I do not include an SSCCE for now but I
> can try to build one if this helps figuring out the issue.>
> >
> > ------>
> > [...]>
> > Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
> JobResult.>
> > [...]>
> > at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)>
>
> > at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)>
> > at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)>
>
> > at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)>
>
> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)>
> > [...]>
> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[
> akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]]
> after [10000 ms]. Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".>
> > at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)>
> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)>
> > at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)>
>
> > at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)>
>
> > at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)>
>
> > at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)>
>
> > at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)>
>
> > at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)>
>
> > at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)>
>
> > at java.lang.Thread.run(Thread.java:745)>
> > [...]>
> > ------>
> >
> >
> > Best regards and thanks for your help,>
> > Lukas>
> >
> >
> >
> >
>
>
>

Re: Cannot configure akka.ask.timeout

Posted by qi luo <lu...@gmail.com>.
Hi Alex and Lukas,

This error is controlled by another RPC timeout (which is hard coded and not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can propose a fix on that?

Cheers,
Qi

> On Dec 12, 2018, at 7:07 AM, Alex Vinnik <al...@gmail.com> wrote:
> 
> Hi there,
> 
> Run into the same problem running a batch job with Flink 1.6.1/1.6.2 .
> 
> akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> 
> akka.ask.timeout: 600s
> 
> But looks like it is not honored. Any suggestions what can be done.
> 
> Thanks
> 
> On 2018/07/13 10:24:16, Lukas Kircher <l...@gmail.com <ma...@gmail.com>> wrote: 
> > Hello,> 
> > 
> > I have problems setting configuration parameters for Akka in Flink 1.5.0. When I run a job I get the exception listed below which states that Akka timed out after 10000ms. I tried to increase the timeout by following the Flink configuration documentation. Specifically I did the following:> 
> > 
> > 1) Passed a configuration to the Flink execution environment with `akka.ask.timeout` set to a higher value. I started this in Intellij.> 
> > 2) Passed program arguments via the run configuration in Intellij, e.g. `-Dakka.ask.timeout:100s`> 
> > 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a local standalone cluster via start-cluster.sh. The setting is reflected in Flink's web interface.> 
> > 
> > However - despite explicit configuration the default setting seems to be used. The exception below states in each case that akka ask timed out after 10000ms.> 
> > 
> > As my problem seems very basic I do not include an SSCCE for now but I can try to build one if this helps figuring out the issue.> 
> > 
> > ------> 
> > [...]> 
> > Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.> 
> > [...]> 
> > at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)> 
> > at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)> 
> > at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)> 
> > at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)> 
> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)> 
> > [...]> 
> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".> 
> > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)> 
> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)> 
> > at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)> 
> > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)> 
> > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)> 
> > at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)> 
> > at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)> 
> > at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)> 
> > at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)> 
> > at java.lang.Thread.run(Thread.java:745)> 
> > [...]> 
> > ------> 
> > 
> > 
> > Best regards and thanks for your help,> 
> > Lukas> 
> > 
> > 
> > 
> >