You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "thinktothings (JIRA)" <ji...@apache.org> on 2019/02/25 03:20:00 UTC
[jira] [Updated] (FLINK-11738) Caused by:
akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/dispatcher15e85f5d-a55d-4773-8197-f0db5658f55b#1335897563]]
after [10000 ms]. Sender[null] sent
[ https://issues.apache.org/jira/browse/FLINK-11738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
thinktothings updated FLINK-11738:
----------------------------------
Description:
Akka.ask.timeout 10 seconds, this miniCluster environment is written dead, can not be changed?
---------------------------------------------------------------------------------------------------------------------------------
public class ConfigOptions
{ /** * Starts building a new \\{@link ConfigOption}
.
*
* @param key The key for the config option.
* @return The builder for the config option with the given key.
*/
public static OptionBuilder key(String key) \{ checkNotNull(key); return new OptionBuilder(key); }
---------------------------------------------------------------------------------------------------------------------------------
public class AkkaOptions {
/**
* Timeout for akka ask calls.
*/
public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
.key("akka.ask.timeout")
.defaultValue("10 s")
.withDescription("Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you" +
" should try to increase this value. Timeouts can be caused by slow machines or a congested network. The" +
" timeout value requires a time-unit specifier (ms/s/min/h/d).");
---------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------
package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* nc -lk 1234 输入数据
*/
object SocketWindowWordCount {
def main(args: Array[String]): Unit = {
val port = 1234
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val dataStream = env.socketTextStream("localhost", port, '\n')
import org.apache.flink.streaming.api.scala._
val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
.keyBy("word")
/**
* 每5秒刷新一次,相当于重新开始计数,
* 好处,不需要一直拿所有的数据统计
* 只需要在指定时间间隔内的增量数据,减少了数据规模
*/
.timeWindow(Time.seconds(5))
.sum("count" )
textResult.print().setParallelism(1)
if(args == null || args.size ==0){
env.execute("默认作业")
}else{
env.execute(args(0))
}
println("结束")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
}
---------------------------------------------------------------------------------------------------------------------------------
was:
Akka.ask.timeout 10 seconds, this miniCluster environment is written dead, can not be changed?
---------------------------------------------------------------------------------------------------------------------------------
public class ConfigOptions {
/**
* Starts building a new \{@link ConfigOption}.
*
* @param key The key for the config option.
* @return The builder for the config option with the given key.
*/
public static OptionBuilder key(String key) {
checkNotNull(key);
return new OptionBuilder(key);
}
---------------------------------------------------------------------------------------------------------------------------------
public class AkkaOptions {
/**
* Timeout for akka ask calls.
*/
public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
.key("akka.ask.timeout")
.defaultValue("10 s")
.withDescription("Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you" +
" should try to increase this value. Timeouts can be caused by slow machines or a congested network. The" +
" timeout value requires a time-unit specifier (ms/s/min/h/d).");
---------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher15e85f5d-a55d-4773-8197-f0db5658f55b#1335897563]] after [10000 ms]. Sender[null] sent
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-11738
> URL: https://issues.apache.org/jira/browse/FLINK-11738
> Project: Flink
> Issue Type: Bug
> Components: Client
> Affects Versions: 1.7.2
> Environment: flink 1.7.2 client
> !image-2019-02-25-10-57-20-106.png!
>
> !image-2019-02-25-10-57-32-876.png!
>
>
> !image-2019-02-25-10-57-39-753.png!
>
>
>
>
>
> Reporter: thinktothings
> Priority: Major
> Attachments: image-2019-02-25-10-57-20-106.png, image-2019-02-25-10-57-32-876.png, image-2019-02-25-10-57-39-753.png
>
>
> Akka.ask.timeout 10 seconds, this miniCluster environment is written dead, can not be changed?
> ---------------------------------------------------------------------------------------------------------------------------------
> public class ConfigOptions
> { /** * Starts building a new \\{@link ConfigOption}
> .
> *
> * @param key The key for the config option.
> * @return The builder for the config option with the given key.
> */
> public static OptionBuilder key(String key) \{ checkNotNull(key); return new OptionBuilder(key); }
> ---------------------------------------------------------------------------------------------------------------------------------
>
> public class AkkaOptions {
> /**
> * Timeout for akka ask calls.
> */
> public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
> .key("akka.ask.timeout")
> .defaultValue("10 s")
> .withDescription("Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you" +
> " should try to increase this value. Timeouts can be caused by slow machines or a congested network. The" +
> " timeout value requires a time-unit specifier (ms/s/min/h/d).");
>
>
> ---------------------------------------------------------------------------------------------------------------------------------
>
>
>
> ---------------------------------------------------------------------------------------------------------------------------------
>
> package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
> /**
> * nc -lk 1234 输入数据
> */
> object SocketWindowWordCount {
> def main(args: Array[String]): Unit = {
> val port = 1234
> // get the execution environment
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
> // get input data by connecting to the socket
> val dataStream = env.socketTextStream("localhost", port, '\n')
> import org.apache.flink.streaming.api.scala._
> val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
> .keyBy("word")
> /**
> * 每5秒刷新一次,相当于重新开始计数,
> * 好处,不需要一直拿所有的数据统计
> * 只需要在指定时间间隔内的增量数据,减少了数据规模
> */
> .timeWindow(Time.seconds(5))
> .sum("count" )
> textResult.print().setParallelism(1)
> if(args == null || args.size ==0){
> env.execute("默认作业")
> }else{
> env.execute(args(0))
> }
> println("结束")
> }
> // Data type for words with count
> case class WordWithCount(word: String, count: Long)
> }
>
> ---------------------------------------------------------------------------------------------------------------------------------
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)