You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com> on 2017/04/14 06:45:19 UTC

回复:回复:Changing timeout for cancel command

Hi Jürgen, 
    I got your point from the log, but i think it can not do anything from flink side. The task receives the cancel command from master, and it will dipose the operator after task thread is interupted.
Maybe you can check if there are some parameters to set for waiting longer for ack in hdfs. 

cheers,zhijiang------------------------------------------------------------------发件人:Jürgen Thomann <ju...@innogames.com>发送时间:2017年4月13日(星期四) 15:32收件人:user <us...@flink.apache.org>主 题:Re: 回复:Changing timeout for cancel command
              Hi zhijiang,    I checked this value and I haven't configured it so I think it      should be the default 10s. I checked how long the flink cancel      command took with the time command and it was finished after 6      seconds. 
        After filtering out the messages of one Sink, it looks like it      interrupts it in the area of milliseconds. Here are the logs from      one taskmanager (standalone cluster).
        06:53:16,484 INFO       org.apache.flink.runtime.taskmanager.Task                     -      Attempting to cancel task Sink: /tmp/flink/events_invalid HDFS      sink (2/2) (477db6e41932ad9b60c72e14de4488ed).
      06:53:16,484 INFO       org.apache.flink.runtime.taskmanager.Task                     -      Sink: /tmp/flink/events_invalid HDFS sink (2/2)      (477db6e41932ad9b60c72e14de4488ed) switched from RUNNING to      CANCELING.
      06:53:16,484 INFO       org.apache.flink.runtime.taskmanager.Task                     -      Triggering cancellation of task code Sink:      /tmp/flink/events_invalid HDFS sink (2/2)      (477db6e41932ad9b60c72e14de4488ed).
      06:53:16,503 ERROR      org.apache.flink.streaming.runtime.tasks.StreamTask           -      Error during disposal of stream operator.
      06:53:16,503 ERROR      org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink       - Error while trying to hflushOrSync!      java.io.InterruptedIOException: Interrupted while waiting for data      to be acknowledged by pipeline
      java.io.InterruptedIOException: Interrupted while waiting for data      to be acknowledged by pipeline
              atorg.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
              atorg.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038)
              at      org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946)
              atorg.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
              at sun.reflect.GeneratedMethodAccessor156.invoke(Unknown      Source)
              atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:498)
              atorg.apache.flink.streaming.connectors.fs.StreamWriterBase.hflushOrSync(StreamWriterBase.java:72)
              atorg.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:131)
              atorg.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:146)
              atorg.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
              atorg.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:423)
              atorg.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
              atorg.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
              atorg.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
              atorg.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
              at      org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
              at java.lang.Thread.run(Thread.java:745)
      06:53:16,504 INFO       org.apache.flink.core.fs.FileSystem                           -      Ensuring all FileSystem streams are closed for Sink:      /tmp/flink/events_invalid HDFS sink (2/2)
      06:53:16,504 INFO       org.apache.flink.runtime.taskmanager.TaskManager              -      Un-registering task and sending final execution state CANCELED to      JobManager for task Sink: /tmp/flink/events_invalid HDFS sink      (477db6e41932ad9b60c72e14de4488ed)
        Best,
    Jürgen
    
    On 13.04.2017 07:05,      Zhijiang(wangzhijiang999) wrote:
                            Hi Jürgen,        
                     You can set the timeout in the          configuration by this key "akka.ask.timeout", and the current          default value is 10 s. Hope it can help you.        
                
                cheers,        zhijiang        
                  
                            ------------------------------------------------------------------          发件人:Jürgen              Thomann <ju...@innogames.com>          发送时间:2017年4月12日(星期三)              19:04          收件人:user              <us...@flink.apache.org>          主 题:Changing              timeout for cancel command          
                      Hi,
          
We currently get the following exception if we cancel a job which writes 
          to Hadoop:
ERROR org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  
- Error while trying to hflushOrSync! java.io.InterruptedIOException: 
Interrupted while waiting for data to be acknowledged by pipeline
          
This causes problem if we cancel a job with creating a savepoint and 
resubmitting the job because the file is sometimes at the end smaller 
          than the file size specified in the valid-length file.
          
Is there a way to increase the time out during cancel to give the flush 
          a bit more time? We currently lose events if this happens.
          
          Best,
          Jürgen