You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zhangminglei <gi...@git.apache.org> on 2018/05/30 12:18:22 UTC

[GitHub] flink pull request #6103: [FLINK-9413] [distributed coordination] Tasks can ...

GitHub user zhangminglei opened a pull request:

    https://github.com/apache/flink/pull/6103

    [FLINK-9413] [distributed coordination] Tasks can fail with Partition…

    …NotFoundException if consumer deployment takes too long
    
    ## What is the purpose of the change
    Tasks can fail with PartitionNotFoundException if consumer deployment takes too long. And the producer has been assigned a slot but we do not wait until it is actually running.
    
    ## Brief change log
    Change the condition to make the producer wait until it is actually running.
    
    ## Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zhangminglei/flink flink-9413

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6103.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6103
    
----
commit a52434d14117fde4e911f9a8f81a2e10fdd9ba77
Author: zhangminglei <zm...@...>
Date:   2018-05-30T12:17:17Z

    [FLINK-9413] [distributed coordination] Tasks can fail with PartitionNotFoundException if consumer deployment takes too long

----


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    @tillrohrmann Could you take a look on this PR  ? Thank you.


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by RalphSu <gi...@git.apache.org>.
Github user RalphSu commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    @zhangminglei  does the description of this issue indicate a workaround for this? I'm using 1.4.2. With dozens of jobs been submitted to my cluster, i'm constantly see this PartitionNotFound exception.


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    With [FLINK-5129](https://issues.apache.org/jira/browse/FLINK-5129) which is part of Flink 1.3.0, we allow the `BlobCache` to retrieve the blobs from the HA storage location. In your case this seems to be HDFS. The idea was that this should alleviate the I/O pressure from the `JobManager` because not all blobs are served by a single node. Thus, this should rather improve the situation with this issue.


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by RalphSu <gi...@git.apache.org>.
Github user RalphSu commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    okey...
    
    I encounter issue when submit jobs, stack as below, looks the same with this one. Is this a regression, or are we thinking the fix will break existing code?
    
    
    
    org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 1d986178ce506d4e3e71c4f6f61439b7@36897daa4a60b37dcae0973ba48a868b not found.
    	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:270)
    	at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:172)
    	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:392)
    	at org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1300)
    	at org.apache.flink.runtime.taskmanager.Task.lambda$triggerPartitionProducerStateCheck$0(Task.java:1139)
    	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



---

[GitHub] flink pull request #6103: [FLINK-9413] [distributed coordination] Tasks can ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6103#discussion_r192559985
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ---
    @@ -103,9 +103,7 @@ public String toString() {
     			// The producing task needs to be RUNNING or already FINISHED
     			if (consumedPartition.isConsumable() && producerSlot != null &&
     					(producerState == ExecutionState.RUNNING ||
    -						producerState == ExecutionState.FINISHED ||
    -						producerState == ExecutionState.SCHEDULED ||
    -						producerState == ExecutionState.DEPLOYING)) {
    --- End diff --
    
    Yes Till. The flink1.5 code does not use JobManager and TaskManager class for running. Instead, we use JobMaster & TaskExecutor now. I think we should remove them and the relative class for legacy code.


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    @RalphSu you could try increasing the `taskmanager.network.request-backoff.max` configuration parameter. Per default it is set to `10000` ms.


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    How critical is it to change this setting? 
    I would assume this should be caught by the regular recovery, so unless this occurs very often and thus leads to confusing exceptions in the log, should we maybe leave it as it is?


---

[GitHub] flink pull request #6103: [FLINK-9413] [distributed coordination] Tasks can ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6103#discussion_r192558687
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ---
    @@ -103,9 +103,7 @@ public String toString() {
     			// The producing task needs to be RUNNING or already FINISHED
     			if (consumedPartition.isConsumable() && producerSlot != null &&
     					(producerState == ExecutionState.RUNNING ||
    -						producerState == ExecutionState.FINISHED ||
    -						producerState == ExecutionState.SCHEDULED ||
    -						producerState == ExecutionState.DEPLOYING)) {
    --- End diff --
    
    Yes, I would say that we tackle this problem after we have removed the legacy code. For the moment, there is a work around to set the max backoff time higher for the data connections.


---

[GitHub] flink pull request #6103: [FLINK-9413] [distributed coordination] Tasks can ...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6103#discussion_r191814673
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ---
    @@ -103,9 +103,7 @@ public String toString() {
     			// The producing task needs to be RUNNING or already FINISHED
     			if (consumedPartition.isConsumable() && producerSlot != null &&
     					(producerState == ExecutionState.RUNNING ||
    -						producerState == ExecutionState.FINISHED ||
    -						producerState == ExecutionState.SCHEDULED ||
    -						producerState == ExecutionState.DEPLOYING)) {
    --- End diff --
    
    Thank you till. But where is legacy code ? I found a lot of code belongs to legacy mode.........omg....I do not think I should remove all legacy mode code....... Could you tell me please ? Lol.


---

[GitHub] flink pull request #6103: [FLINK-9413] [distributed coordination] Tasks can ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6103#discussion_r191753857
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ---
    @@ -103,9 +103,7 @@ public String toString() {
     			// The producing task needs to be RUNNING or already FINISHED
     			if (consumedPartition.isConsumable() && producerSlot != null &&
     					(producerState == ExecutionState.RUNNING ||
    -						producerState == ExecutionState.FINISHED ||
    -						producerState == ExecutionState.SCHEDULED ||
    -						producerState == ExecutionState.DEPLOYING)) {
    --- End diff --
    
    This change will break deployments where `allowLazyDeployment` is set to `false`. In the Flip-6 code, this is per default set to `true`, but for the legacy mode, the value is `false`. Thus, we would have to first remove the legacy mode and `allowLazyDeployment`.


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    @tillrohrmann I updated the code. Please review again. Thank you till.


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    That all depends why the failure happens in the first place. It seems to happen if the receiver of a channel starts much faster than the sender. The longest part of the deployment is library distribution, which happens only once. After one failure / recovery, the library should be cached and the next attempt to start the task should be very fast.


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    By the way, Till. I was wondering how can you find this issue ? Does this issue reported by the user ? 


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    Hi, @tillrohrmann I found more than one user found this issue and affect their work. I suggest we can merge this PR as a temporary solution until the travis turn green.


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    But there is another question. If the task failed for many times, Will that cause issue ?


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    Hi, Stephan. Thank you for your response.  But I would ask, if the issue does not occur by the receiver of a channel starts much faster than the sender. Like RuntimeException cause the task failed for several times. I do not know how flink solve this, But I know spark the default if the task failed for 4 times. Then the job failed.


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by RalphSu <gi...@git.apache.org>.
Github user RalphSu commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    @tillrohrmann  already did that, it looks alleviate though not fix. I'm upgrade from 1.2.0 to 1.4.2. Major thing i can see is TM now connection to HDFS instead of only talk to JobManager,  could this increase the possibility of this issue?


---

[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

Posted by zhangminglei <gi...@git.apache.org>.
Github user zhangminglei commented on the issue:

    https://github.com/apache/flink/pull/6103
  
    Hi, @StephanEwen What you mean is that the Task will restart by the regular recovery ? So, We do not worry to much on this.


---