You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nico Kruber (JIRA)" <ji...@apache.org> on 2018/02/22 17:43:00 UTC

[jira] [Commented] (FLINK-8750) InputGate may contain data after an EndOfPartitionEvent

    [ https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373141#comment-16373141 ] 

Nico Kruber commented on FLINK-8750:
------------------------------------

actually, the problem may be with the value only: {{SingleInputGate}} sets {{moreAvailable = inputChannelsWithData.size() > 0;}} but at least for local channels, couldn't it be that there is actually no data in the channel and it was just flushed and got the notification that there may have been something now? (The constraints on these notifications have been lifted recently.)

> InputGate may contain data after an EndOfPartitionEvent
> -------------------------------------------------------
>
>                 Key: FLINK-8750
>                 URL: https://issues.apache.org/jira/browse/FLINK-8750
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: Nico Kruber
>            Priority: Major
>             Fix For: 1.5.0
>
>
> The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates that there was still some data after an {{EndOfPartitionEvent}} or that {{BufferOrEvent#moreAvailable}} contained the wrong value:
> {code}
> testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase)  Time elapsed: 4.611 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: null
> 	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
> 	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173)
> 	at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
> 	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292)
> 	at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)