You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "thinktothings (JIRA)" <ji...@apache.org> on 2019/02/25 03:20:00 UTC

[jira] [Updated] (FLINK-11738) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher15e85f5d-a55d-4773-8197-f0db5658f55b#1335897563]] after [10000 ms]. Sender[null] sent

     [ https://issues.apache.org/jira/browse/FLINK-11738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

thinktothings updated FLINK-11738:
----------------------------------
    Description: 
Akka.ask.timeout 10 seconds, this miniCluster environment is written dead, can not be changed?

---------------------------------------------------------------------------------------------------------------------------------

public class ConfigOptions

{ /** * Starts building a new \\{@link ConfigOption}

.
 *
 * @param key The key for the config option.
 * @return The builder for the config option with the given key.
 */
 public static OptionBuilder key(String key) \{ checkNotNull(key); return new OptionBuilder(key); }

---------------------------------------------------------------------------------------------------------------------------------

 

public class AkkaOptions {

/**
 * Timeout for akka ask calls.
 */
 public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
 .key("akka.ask.timeout")
 .defaultValue("10 s")
 .withDescription("Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you" +
 " should try to increase this value. Timeouts can be caused by slow machines or a congested network. The" +
 " timeout value requires a time-unit specifier (ms/s/min/h/d).");

 

 

---------------------------------------------------------------------------------------------------------------------------------

 

 

 

---------------------------------------------------------------------------------------------------------------------------------

 

package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * nc -lk 1234 输入数据
 */
object SocketWindowWordCount {

 def main(args: Array[String]): Unit = {


 val port = 1234
 // get the execution environment
 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment




 // get input data by connecting to the socket
 val dataStream = env.socketTextStream("localhost", port, '\n')



 import org.apache.flink.streaming.api.scala._
 val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
 .keyBy("word")
 /**
 * 每5秒刷新一次,相当于重新开始计数,
 * 好处,不需要一直拿所有的数据统计
 * 只需要在指定时间间隔内的增量数据,减少了数据规模
 */
 .timeWindow(Time.seconds(5))
 .sum("count" )

 textResult.print().setParallelism(1)



 if(args == null || args.size ==0){
 env.execute("默认作业")
 }else{
 env.execute(args(0))
 }

 println("结束")

 }


 // Data type for words with count
 case class WordWithCount(word: String, count: Long)

}

 

---------------------------------------------------------------------------------------------------------------------------------

  was:
Akka.ask.timeout 10 seconds, this miniCluster environment is written dead, can not be changed?

---------------------------------------------------------------------------------------------------------------------------------

public class ConfigOptions {

 /**
 * Starts building a new \{@link ConfigOption}.
 *
 * @param key The key for the config option.
 * @return The builder for the config option with the given key.
 */
 public static OptionBuilder key(String key) {
 checkNotNull(key);
 return new OptionBuilder(key);
 }

---------------------------------------------------------------------------------------------------------------------------------

 

public class AkkaOptions {

 /**
 * Timeout for akka ask calls.
 */
 public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
 .key("akka.ask.timeout")
 .defaultValue("10 s")
 .withDescription("Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you" +
 " should try to increase this value. Timeouts can be caused by slow machines or a congested network. The" +
 " timeout value requires a time-unit specifier (ms/s/min/h/d).");

 

 

---------------------------------------------------------------------------------------------------------------------------------

 

 

 

---------------------------------------------------------------------------------------------------------------------------------

 

 

---------------------------------------------------------------------------------------------------------------------------------


> Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher15e85f5d-a55d-4773-8197-f0db5658f55b#1335897563]] after [10000 ms]. Sender[null] sent
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11738
>                 URL: https://issues.apache.org/jira/browse/FLINK-11738
>             Project: Flink
>          Issue Type: Bug
>          Components: Client
>    Affects Versions: 1.7.2
>         Environment: flink 1.7.2 client
> !image-2019-02-25-10-57-20-106.png!
>  
> !image-2019-02-25-10-57-32-876.png!
>  
>  
> !image-2019-02-25-10-57-39-753.png!
>  
>  
>  
>  
>  
>            Reporter: thinktothings
>            Priority: Major
>         Attachments: image-2019-02-25-10-57-20-106.png, image-2019-02-25-10-57-32-876.png, image-2019-02-25-10-57-39-753.png
>
>
> Akka.ask.timeout 10 seconds, this miniCluster environment is written dead, can not be changed?
> ---------------------------------------------------------------------------------------------------------------------------------
> public class ConfigOptions
> { /** * Starts building a new \\{@link ConfigOption}
> .
>  *
>  * @param key The key for the config option.
>  * @return The builder for the config option with the given key.
>  */
>  public static OptionBuilder key(String key) \{ checkNotNull(key); return new OptionBuilder(key); }
> ---------------------------------------------------------------------------------------------------------------------------------
>  
> public class AkkaOptions {
> /**
>  * Timeout for akka ask calls.
>  */
>  public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
>  .key("akka.ask.timeout")
>  .defaultValue("10 s")
>  .withDescription("Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you" +
>  " should try to increase this value. Timeouts can be caused by slow machines or a congested network. The" +
>  " timeout value requires a time-unit specifier (ms/s/min/h/d).");
>  
>  
> ---------------------------------------------------------------------------------------------------------------------------------
>  
>  
>  
> ---------------------------------------------------------------------------------------------------------------------------------
>  
> package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
> /**
>  * nc -lk 1234 输入数据
>  */
> object SocketWindowWordCount {
>  def main(args: Array[String]): Unit = {
>  val port = 1234
>  // get the execution environment
>  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
>  // get input data by connecting to the socket
>  val dataStream = env.socketTextStream("localhost", port, '\n')
>  import org.apache.flink.streaming.api.scala._
>  val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
>  .keyBy("word")
>  /**
>  * 每5秒刷新一次,相当于重新开始计数,
>  * 好处,不需要一直拿所有的数据统计
>  * 只需要在指定时间间隔内的增量数据,减少了数据规模
>  */
>  .timeWindow(Time.seconds(5))
>  .sum("count" )
>  textResult.print().setParallelism(1)
>  if(args == null || args.size ==0){
>  env.execute("默认作业")
>  }else{
>  env.execute(args(0))
>  }
>  println("结束")
>  }
>  // Data type for words with count
>  case class WordWithCount(word: String, count: Long)
> }
>  
> ---------------------------------------------------------------------------------------------------------------------------------



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)