You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (JIRA)" <ji...@apache.org> on 2018/12/06 12:37:00 UTC

[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

    [ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711387#comment-16711387 ] 

Chesnay Schepler commented on FLINK-10832:
------------------------------------------

The project you attached works for me locally in the IDE, and also when submitting the built project to a 1.6.2, 1.7.0 (taken from [https://dist.apache.org/repos/dist/release/flink)] and 1.8-SNAPSHOT (locally built)  cluster.

I did not make _any_ modifications to the project.

What we can see in the logs you provided is that the JobMaster is not shutting down.

The following line is present in my logs, but missing in the attached logs:
{code:java}
13:25:56.709 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Simple Test(e8a889dc0394b0c664dcce46f36ed0cc).{code}
Beyond that the main difference between the logs is this:
{code:java}
DEBUG o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection 6ac59614b23655af1e13f08e16b96ea4.
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null{code}
{code:java}
DEBUG o.a.f.runtime.jobmaster.JobMaster - Close ResourceManager connection 966422750006990ed1f304766e210161.
org.apache.flink.util.FlinkException: JobManager is shutting down.{code}
[~till.rohrmann] Do you have any idea?

> StreamExecutionEnvironment.execute() does not return when all sources end
> -------------------------------------------------------------------------
>
>                 Key: FLINK-10832
>                 URL: https://issues.apache.org/jira/browse/FLINK-10832
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.5.5, 1.6.2
>            Reporter: Arnaud Linz
>            Priority: Critical
>         Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>         // get the execution environment
>         *final* StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment_();
>         // get input data
>         *final* DataStreamSource<String> text = env.addSource(*new* +SourceFunction<String>()+ {
>             @Override
>             *public* *void* run(*final* SourceContext<String> ctx) *throws* Exception {
>                 *for* (*int* count = 0; count < 5; count++) {
>                     ctx.collect(String._valueOf_(count));
>                 }
>             }
>             @Override
>             *public* *void* cancel() {
>             }
>         });
>         text.print().setParallelism(1);
>         env.execute("Simple Test");
>         // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" mechanism to achieve proper stop of streaming applications from their own code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down TaskExecutorLocalStateStoresManager. (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
>  {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
>  {{[2018-11-07 11:11:23,591] INFO Removing cache directory C:\Users\alinz\AppData\Local\Temp\flink-web-ui (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
>  {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}}
>  {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}}
>  {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)}}
>  {{[2018-11-07 11:11:23,607] INFO Stop job leader service. (org.apache.flink.runtime.taskexecutor.JobLeaderService:135)}}
>  {{[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)