You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sachingoel0101 <gi...@git.apache.org> on 2015/08/04 06:13:10 UTC

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

GitHub user sachingoel0101 opened a pull request:

    https://github.com/apache/flink/pull/979

    [FLINK-2472]Make JobClientActor poll JobManager continuously for updates on submitted job

    This PR adds functionality for the `JobClientActor` to poll the `JobManager` after a set interval `JOB_CLIENT_HEARTBEAT_INTERVAL`(5s) about the status of submitted job.
    Furthermore, there is a inherent timeout on the `JobClientActor` for not receiving any messages(only `JobManager` is likely to send any messages) for `JOB_CLIENT_JOB_MANAGER_TIMEOUT`(10s).
    
    If the job status comes as `CREATED` or `RESTARTING` for more than `JOB_CLIENT_JOB_STATUS_TIMEOUT`(30s), the `JobClient` is sent a `Status.Failure` message, which to equivalent to the idea mentioned in the jira ticket.
    This work isn't complete, and I'd love feedback about whether different job status messages are handled properly, and what is the best place to place the time interval configuration. It can be placed in the usual place, i.e., `ConfigConstants` but there is no `Configuration` accessible in the `JobClientActor`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sachingoel0101/flink flink-2472

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/979.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #979
    
----
commit 9cf23c5e397bc7a8a8faf7427d2ae6dbd0d73143
Author: Sachin Goel <sa...@gmail.com>
Date:   2015-08-03T23:14:29Z

    [FLINK-2472]Make JobClientActor poll JobManager continuously for updates and fail if job manager is dead or task hasn't been running

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36181570
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -59,11 +87,50 @@ public JobClientActor(
     		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
     
     		this.sysoutUpdates = sysoutUpdates;
    +		// set this to -1 to indicate the job hasn't been created yet.
    +		this.currentJobCreatedAt = -1;
     	}
     	
     	@Override
     	protected void handleMessage(Object message) {
     		
    +		// ======= Job status messages on regular intervals ==============
    +		if(message instanceof JobManagerMessages.CurrentJobStatus){
    +			JobStatus statusReport = ((JobManagerMessages.CurrentJobStatus) message).status();
    +			long timeDiff;
    +			switch(statusReport){
    +				case RUNNING:
    +					// Vincent, we happy?
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case FINISHED:
    +					// Yeah! We happy!
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case CREATED:
    +					// we're still at Job CREATED. Let's see if we're over the limit.
    +					timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +					if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +						failWithTimeout(timeDiff);
    +					} // otherwise just wait a bit longer.
    +					break;
    +				case RESTARTING:
    +					if(this.currentJobCreatedAt == -1){
    +						// we effectively have re-created the job
    +						this.currentJobCreatedAt = System.currentTimeMillis();
    +					} else{
    +						// it was already at either CREATED or RESTARTING. See if we're over the limit.
    +						timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +						if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +							failWithTimeout(timeDiff);
    +						} // otherwise let's wait a bit more.
    +					}
    +					break;
    +				default:
    +					// well, canceled or failed. We'll find out with an exact message in a while.
    --- End diff --
    
    What do you mean by `BLOCKING`? That the `JobManager` will fail to respond and is blocked? In that case we'll automatically get a timeout for not receiving any messages from `JobManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r37763757
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -156,6 +294,16 @@ else if (message instanceof Terminated) {
     		}
     	}
     
    +	private void failWithTimeout(long timeDiff) {
    +		String msg = "Job hasn't been running for more than " + JOB_CLIENT_JOB_STATUS_TIMEOUT/1.0e+3 + " seconds";
    +
    +		logger.debug(msg);
    +		if (submitter != null) {
    +			submitter.tell(decorateMessage(new Status.Failure(new Exception(msg))), sender());
    --- End diff --
    
    Can we specify a more specific exception. E.g. `JobExecutionException`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r37768306
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -49,27 +57,127 @@
     	// Actor which submits a job to the JobManager via this actor
     	private ActorRef submitter;
     
    +	// timeout for a message from the job manager
    +	private static FiniteDuration JOB_CLIENT_JOB_MANAGER_TIMEOUT;
    +
    +	// heartbeat interval for pinging the job manager for job status
    +	private static FiniteDuration JOB_CLIENT_HEARTBEAT_INTERVAL;
    +
    +	// initial time delay before starting pinging job manager over regular intervals
    +	private static FiniteDuration JOB_CLIENT_INITIAL_PING_DELAY;
    +
    +	// maximum waiting time for a job to go to running status (milliseconds)
    +	private static long JOB_CLIENT_JOB_STATUS_TIMEOUT;
    +
    +	// time at which the current job was created
    +	private long currentJobCreatedAt;
    +
    +	// current job id
    +	private JobID currentJobId;
    +
    +	// scheduler to ping JobManager after a time interval
    +	private Cancellable scheduler;
    +
    +	// maintain when we got our last ping from the Job Manager.
    +	private long jobManagerPinged = 0;
    +
    +	// maintain the last time we got a terminal state message
    +	private long terminalStateAt = 0;
    +
     	public JobClientActor(
     			ActorRef jobManager,
     			Logger logger,
     			boolean sysoutUpdates,
    -			Option<UUID> leaderSessionID) {
    +			Option<UUID> leaderSessionID,
    +			Configuration config) {
     		this.jobManager = Preconditions.checkNotNull(jobManager, "The JobManager ActorRef must not be null.");
     		this.logger = Preconditions.checkNotNull(logger, "The logger must not be null.");
     		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
     
     		this.sysoutUpdates = sysoutUpdates;
    +		// set this to 0 to indicate the job hasn't been created yet.
    +		this.currentJobCreatedAt = 0;
    +		this.terminalStateAt = 0;
    +		parseTimes(config);
     	}
     	
     	@Override
     	protected void handleMessage(Object message) {
    -		
    +
    +		// first see if the message was from the Job Manager
    +		if(getContext().sender() == jobManager){
    +			this.jobManagerPinged = System.currentTimeMillis();
    +		}
    +
    +		// ======= Job status messages on regular intervals ==============
    +		if(message instanceof JobManagerMessages.CurrentJobStatus){
    +			JobStatus statusReport = ((JobManagerMessages.CurrentJobStatus) message).status();
    +			long timeDiff;
    +			switch(statusReport){
    +				case RUNNING:
    +					// Vincent, we happy?
    +					resetTimeouts();
    +					break;
    +				case FINISHED:
    +					// Yeah! We happy!
    +					resetTimeouts();
    +					break;
    +				case CREATED:
    +					// we're still at Job CREATED. Let's see if we're over the limit.
    +					timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +					if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +						failWithTimeout(timeDiff);
    +					} // otherwise just wait a bit longer.
    +					break;
    +				case RESTARTING:
    +					if(this.currentJobCreatedAt == 0){
    --- End diff --
    
    Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36169072
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -107,7 +177,10 @@ else if (message instanceof JobClientMessages.SubmitJobAndWait) {
     						decorateMessage(new Status.Failure(new Exception(msg))),
     						ActorRef.noSender());
     
    +				// cancel scheduler and inactivity triggered receive messages
     				getContext().unwatch(jobManager);
    +				scheduler.cancel();
    +				resetContextAndActor();
    --- End diff --
    
    I guess you can fuse the `scheduler.cancel` and `resetContextAndActor` method calls.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/979#issuecomment-127968729
  
    I've added a few more message handlers:
    1. We never miss a `RUNNING` state between restarts.
    2. There is a timeout for repeatedly getting `CANCELED/.ING` or `FAILED/ING` messages.
    
    Further, I worked around the `receiveTimeout` bug(?) that a timeout message might be enqueued even if we just received a message. This is done by putting a tolerance limit of 0.1 times the `JOB_MANAGER_TIMEOUT` and maintaining the last ping from the `JobManager`.
    
    @tillrohrmann , could you look this over again? Lemme know if there are still unhandled cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36189042
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -59,11 +87,50 @@ public JobClientActor(
     		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
     
     		this.sysoutUpdates = sysoutUpdates;
    +		// set this to -1 to indicate the job hasn't been created yet.
    +		this.currentJobCreatedAt = -1;
     	}
     	
     	@Override
     	protected void handleMessage(Object message) {
     		
    +		// ======= Job status messages on regular intervals ==============
    +		if(message instanceof JobManagerMessages.CurrentJobStatus){
    +			JobStatus statusReport = ((JobManagerMessages.CurrentJobStatus) message).status();
    +			long timeDiff;
    +			switch(statusReport){
    +				case RUNNING:
    +					// Vincent, we happy?
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case FINISHED:
    +					// Yeah! We happy!
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case CREATED:
    --- End diff --
    
    The timeout is configurable, or well, it will be. Right now, 30 s is enough to keep the job scheduled and not running, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 closed the pull request at:

    https://github.com/apache/flink/pull/979


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36169928
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -59,11 +87,50 @@ public JobClientActor(
     		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
     
     		this.sysoutUpdates = sysoutUpdates;
    +		// set this to -1 to indicate the job hasn't been created yet.
    +		this.currentJobCreatedAt = -1;
     	}
     	
     	@Override
     	protected void handleMessage(Object message) {
     		
    +		// ======= Job status messages on regular intervals ==============
    +		if(message instanceof JobManagerMessages.CurrentJobStatus){
    +			JobStatus statusReport = ((JobManagerMessages.CurrentJobStatus) message).status();
    +			long timeDiff;
    +			switch(statusReport){
    +				case RUNNING:
    +					// Vincent, we happy?
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case FINISHED:
    +					// Yeah! We happy!
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case CREATED:
    +					// we're still at Job CREATED. Let's see if we're over the limit.
    +					timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +					if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +						failWithTimeout(timeDiff);
    +					} // otherwise just wait a bit longer.
    +					break;
    +				case RESTARTING:
    --- End diff --
    
    What if a jobs gets restarted multiple times and we always miss the `RUNNING` state. Then it would look like as if the jobs stayed all the time in the `RESTARTING` state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/979#issuecomment-127537467
  
    I like the idea to use `ReceiveTimeout` messages to detect blocked `JobManager`. I think the timeout should be configurable via the `flink-conf.yaml` file. Thus, you either pass the `Configuration` to the `JobClientActor` or the individual timeouts. If written some remarks concerning the different state handlings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36169003
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -59,11 +87,50 @@ public JobClientActor(
     		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
     
     		this.sysoutUpdates = sysoutUpdates;
    +		// set this to -1 to indicate the job hasn't been created yet.
    +		this.currentJobCreatedAt = -1;
     	}
     	
     	@Override
     	protected void handleMessage(Object message) {
     		
    +		// ======= Job status messages on regular intervals ==============
    +		if(message instanceof JobManagerMessages.CurrentJobStatus){
    +			JobStatus statusReport = ((JobManagerMessages.CurrentJobStatus) message).status();
    +			long timeDiff;
    +			switch(statusReport){
    +				case RUNNING:
    +					// Vincent, we happy?
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case FINISHED:
    +					// Yeah! We happy!
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case CREATED:
    +					// we're still at Job CREATED. Let's see if we're over the limit.
    +					timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +					if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +						failWithTimeout(timeDiff);
    +					} // otherwise just wait a bit longer.
    +					break;
    +				case RESTARTING:
    +					if(this.currentJobCreatedAt == -1){
    +						// we effectively have re-created the job
    +						this.currentJobCreatedAt = System.currentTimeMillis();
    +					} else{
    +						// it was already at either CREATED or RESTARTING. See if we're over the limit.
    +						timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +						if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +							failWithTimeout(timeDiff);
    +						} // otherwise let's wait a bit more.
    +					}
    +					break;
    +				default:
    +					// well, canceled or failed. We'll find out with an exact message in a while.
    --- End diff --
    
    What if the FAILING state is blocking for example?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36186237
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -59,11 +87,50 @@ public JobClientActor(
     		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
     
     		this.sysoutUpdates = sysoutUpdates;
    +		// set this to -1 to indicate the job hasn't been created yet.
    +		this.currentJobCreatedAt = -1;
     	}
     	
     	@Override
     	protected void handleMessage(Object message) {
     		
    +		// ======= Job status messages on regular intervals ==============
    +		if(message instanceof JobManagerMessages.CurrentJobStatus){
    +			JobStatus statusReport = ((JobManagerMessages.CurrentJobStatus) message).status();
    +			long timeDiff;
    +			switch(statusReport){
    +				case RUNNING:
    +					// Vincent, we happy?
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case FINISHED:
    +					// Yeah! We happy!
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case CREATED:
    +					// we're still at Job CREATED. Let's see if we're over the limit.
    +					timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +					if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +						failWithTimeout(timeDiff);
    +					} // otherwise just wait a bit longer.
    +					break;
    +				case RESTARTING:
    --- End diff --
    
    No it might happen due to a failure in some external system, e.g. HDFS. The problem is that you keep increasing the `timeDiff` even though it should have been restarted in this case. I guess you would have to register for the job status update messages at the `JobManager` to solve the problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36169844
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -59,11 +87,50 @@ public JobClientActor(
     		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
     
     		this.sysoutUpdates = sysoutUpdates;
    +		// set this to -1 to indicate the job hasn't been created yet.
    +		this.currentJobCreatedAt = -1;
     	}
     	
     	@Override
     	protected void handleMessage(Object message) {
     		
    +		// ======= Job status messages on regular intervals ==============
    +		if(message instanceof JobManagerMessages.CurrentJobStatus){
    +			JobStatus statusReport = ((JobManagerMessages.CurrentJobStatus) message).status();
    +			long timeDiff;
    +			switch(statusReport){
    +				case RUNNING:
    +					// Vincent, we happy?
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case FINISHED:
    +					// Yeah! We happy!
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case CREATED:
    --- End diff --
    
    For simple jobs, Flink supports queued scheduling. Thus, it might be the case that one job stays queued up for quite some time until its execution gets started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36188838
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -59,11 +87,50 @@ public JobClientActor(
     		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
     
     		this.sysoutUpdates = sysoutUpdates;
    +		// set this to -1 to indicate the job hasn't been created yet.
    +		this.currentJobCreatedAt = -1;
     	}
     	
     	@Override
     	protected void handleMessage(Object message) {
     		
    +		// ======= Job status messages on regular intervals ==============
    +		if(message instanceof JobManagerMessages.CurrentJobStatus){
    +			JobStatus statusReport = ((JobManagerMessages.CurrentJobStatus) message).status();
    +			long timeDiff;
    +			switch(statusReport){
    +				case RUNNING:
    +					// Vincent, we happy?
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case FINISHED:
    +					// Yeah! We happy!
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case CREATED:
    +					// we're still at Job CREATED. Let's see if we're over the limit.
    +					timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +					if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +						failWithTimeout(timeDiff);
    +					} // otherwise just wait a bit longer.
    +					break;
    +				case RESTARTING:
    +					if(this.currentJobCreatedAt == -1){
    +						// we effectively have re-created the job
    +						this.currentJobCreatedAt = System.currentTimeMillis();
    +					} else{
    +						// it was already at either CREATED or RESTARTING. See if we're over the limit.
    +						timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +						if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +							failWithTimeout(timeDiff);
    +						} // otherwise let's wait a bit more.
    +					}
    +					break;
    +				default:
    +					// well, canceled or failed. We'll find out with an exact message in a while.
    --- End diff --
    
    Okay, that makes sense. Here's what I propose. If the job doesn't change status from the cancel(ed/ing) or fail(ed/ing) states for a given timeout period, we again send a failure message to the `JobClient`.
    The reason I kept this as is was that if the job has been canceled or failed, then the `JobClient` should know the reason for failure, which can only be sent by the `JobManager`. It still seems logical to me though.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36181722
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -59,11 +87,50 @@ public JobClientActor(
     		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
     
     		this.sysoutUpdates = sysoutUpdates;
    +		// set this to -1 to indicate the job hasn't been created yet.
    +		this.currentJobCreatedAt = -1;
     	}
     	
     	@Override
     	protected void handleMessage(Object message) {
     		
    +		// ======= Job status messages on regular intervals ==============
    +		if(message instanceof JobManagerMessages.CurrentJobStatus){
    +			JobStatus statusReport = ((JobManagerMessages.CurrentJobStatus) message).status();
    +			long timeDiff;
    +			switch(statusReport){
    +				case RUNNING:
    +					// Vincent, we happy?
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case FINISHED:
    +					// Yeah! We happy!
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case CREATED:
    +					// we're still at Job CREATED. Let's see if we're over the limit.
    +					timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +					if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +						failWithTimeout(timeDiff);
    +					} // otherwise just wait a bit longer.
    +					break;
    +				case RESTARTING:
    --- End diff --
    
    Isn't getting `RESTARTING` multiple times a bad thing? I'm not sure though. In such a case the only solution would be to lower the heartbeat period I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36181623
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -107,7 +177,10 @@ else if (message instanceof JobClientMessages.SubmitJobAndWait) {
     						decorateMessage(new Status.Failure(new Exception(msg))),
     						ActorRef.noSender());
     
    +				// cancel scheduler and inactivity triggered receive messages
     				getContext().unwatch(jobManager);
    +				scheduler.cancel();
    +				resetContextAndActor();
    --- End diff --
    
    Yes. Thanks for pointing that out. :)
    I was earlier doing things a little differently. Forgot to fuse them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36168637
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -167,4 +275,10 @@ private void logAndPrintMessage(Object message) {
     			System.out.println(message.toString());
     		}
     	}
    +
    +	private void resetContextAndActor(){
    +		this.currentJobCreatedAt = -1;
    +		this.currentJobId = null;
    +		getContext().setReceiveTimeout(FiniteDuration.Inf());
    --- End diff --
    
    According to the docs, you should pass `Duration.Undefined` to turn off the receive timeout message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r37763050
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -49,27 +57,127 @@
     	// Actor which submits a job to the JobManager via this actor
     	private ActorRef submitter;
     
    +	// timeout for a message from the job manager
    +	private static FiniteDuration JOB_CLIENT_JOB_MANAGER_TIMEOUT;
    +
    +	// heartbeat interval for pinging the job manager for job status
    +	private static FiniteDuration JOB_CLIENT_HEARTBEAT_INTERVAL;
    +
    +	// initial time delay before starting pinging job manager over regular intervals
    +	private static FiniteDuration JOB_CLIENT_INITIAL_PING_DELAY;
    +
    +	// maximum waiting time for a job to go to running status (milliseconds)
    +	private static long JOB_CLIENT_JOB_STATUS_TIMEOUT;
    +
    +	// time at which the current job was created
    +	private long currentJobCreatedAt;
    +
    +	// current job id
    +	private JobID currentJobId;
    +
    +	// scheduler to ping JobManager after a time interval
    +	private Cancellable scheduler;
    +
    +	// maintain when we got our last ping from the Job Manager.
    +	private long jobManagerPinged = 0;
    +
    +	// maintain the last time we got a terminal state message
    +	private long terminalStateAt = 0;
    +
     	public JobClientActor(
     			ActorRef jobManager,
     			Logger logger,
     			boolean sysoutUpdates,
    -			Option<UUID> leaderSessionID) {
    +			Option<UUID> leaderSessionID,
    +			Configuration config) {
     		this.jobManager = Preconditions.checkNotNull(jobManager, "The JobManager ActorRef must not be null.");
     		this.logger = Preconditions.checkNotNull(logger, "The logger must not be null.");
     		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
     
     		this.sysoutUpdates = sysoutUpdates;
    +		// set this to 0 to indicate the job hasn't been created yet.
    +		this.currentJobCreatedAt = 0;
    +		this.terminalStateAt = 0;
    +		parseTimes(config);
     	}
     	
     	@Override
     	protected void handleMessage(Object message) {
    -		
    +
    +		// first see if the message was from the Job Manager
    +		if(getContext().sender() == jobManager){
    +			this.jobManagerPinged = System.currentTimeMillis();
    +		}
    +
    +		// ======= Job status messages on regular intervals ==============
    +		if(message instanceof JobManagerMessages.CurrentJobStatus){
    +			JobStatus statusReport = ((JobManagerMessages.CurrentJobStatus) message).status();
    +			long timeDiff;
    +			switch(statusReport){
    +				case RUNNING:
    +					// Vincent, we happy?
    +					resetTimeouts();
    +					break;
    +				case FINISHED:
    +					// Yeah! We happy!
    +					resetTimeouts();
    +					break;
    +				case CREATED:
    +					// we're still at Job CREATED. Let's see if we're over the limit.
    +					timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +					if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +						failWithTimeout(timeDiff);
    +					} // otherwise just wait a bit longer.
    +					break;
    +				case RESTARTING:
    +					if(this.currentJobCreatedAt == 0){
    --- End diff --
    
    Even though we have not defined it properly, but the rest of the code inserts white spaces after keywords such as `if` and `else`, for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r37764196
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -144,11 +268,25 @@ else if (message instanceof Terminated) {
     				String msg = "Lost connection to JobManager " + jobManager.path();
     				logger.info(msg);
     				submitter.tell(decorateMessage(new Status.Failure(new Exception(msg))), getSelf());
    +				resetContextAndActor();
     			} else {
     				logger.error("Received 'Terminated' for unknown actor " + target);
     			}
     		}
     
    +		// ============= No messgaes received in the job manager timeout duration ========
    +		else if (message instanceof ReceiveTimeout){
    +			double tolerance = 0.1 * JOB_CLIENT_JOB_MANAGER_TIMEOUT.toMillis();
    --- End diff --
    
    Why not setting the tolerance to the `JOB_CLIENT_JOB_MANAGER_TIMEOUT`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r37767947
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -144,11 +268,25 @@ else if (message instanceof Terminated) {
     				String msg = "Lost connection to JobManager " + jobManager.path();
     				logger.info(msg);
     				submitter.tell(decorateMessage(new Status.Failure(new Exception(msg))), getSelf());
    +				resetContextAndActor();
     			} else {
     				logger.error("Received 'Terminated' for unknown actor " + target);
     			}
     		}
     
    +		// ============= No messgaes received in the job manager timeout duration ========
    +		else if (message instanceof ReceiveTimeout){
    +			double tolerance = 0.1 * JOB_CLIENT_JOB_MANAGER_TIMEOUT.toMillis();
    --- End diff --
    
    This was to deal with the possibility that akka might enqueue a timeout message right after we get the desired message. So this checks that the timeout message is unintentional, and if we did get a message from `JobManager` just before this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36168482
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -144,11 +230,22 @@ else if (message instanceof Terminated) {
     				String msg = "Lost connection to JobManager " + jobManager.path();
     				logger.info(msg);
     				submitter.tell(decorateMessage(new Status.Failure(new Exception(msg))), getSelf());
    +				scheduler.cancel();
    +				resetContextAndActor();
     			} else {
     				logger.error("Received 'Terminated' for unknown actor " + target);
     			}
     		}
     
    +		// ============= No messgaes received in the job manager timeout duration ========
    +		else if (message instanceof ReceiveTimeout){
    --- End diff --
    
    According to the JavaDocs of `setReceiveTimeout`, it might happen that you receive a `ReceiveTimeout` message even though you've just received another message. You should guard against this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36167357
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -49,6 +55,28 @@
     	// Actor which submits a job to the JobManager via this actor
     	private ActorRef submitter;
     
    +	// timeout for a message from the job manager
    +	private static FiniteDuration JOB_CLIENT_JOB_MANAGER_TIMEOUT = new FiniteDuration(10000, TimeUnit.MILLISECONDS);
    +
    +	// heartbeat interval for pinging the job manager for job status
    +	private static FiniteDuration JOB_CLIENT_HEARTBEAT_INTERVAL = new FiniteDuration(5000, TimeUnit.MILLISECONDS);
    +
    +	// initial time delay before starting pinging job manager over regular intervals
    +	private static FiniteDuration JOB_CLIENT_INITIAL_PING_DELAY = new FiniteDuration(500, TimeUnit.MILLISECONDS);
    +
    +	// maximum waiting time for a job to go to running status (milliseconds)
    +	// setting highly conservative limit for now. This should ideally be in Configuration.
    +	private static long JOB_CLIENT_JOB_STATUS_TIMEOUT = 30000;
    --- End diff --
    
    These values should be configurable. Thus they should be definable in the `flink-conf.yaml` file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/979#issuecomment-137010239
  
    This most likely requires a re-work because of the latest changes in Job Manager and Client. Closing for now while I re-examine all the logic. Will reopen it in a few days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/979#discussion_r36186352
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java ---
    @@ -59,11 +87,50 @@ public JobClientActor(
     		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
     
     		this.sysoutUpdates = sysoutUpdates;
    +		// set this to -1 to indicate the job hasn't been created yet.
    +		this.currentJobCreatedAt = -1;
     	}
     	
     	@Override
     	protected void handleMessage(Object message) {
     		
    +		// ======= Job status messages on regular intervals ==============
    +		if(message instanceof JobManagerMessages.CurrentJobStatus){
    +			JobStatus statusReport = ((JobManagerMessages.CurrentJobStatus) message).status();
    +			long timeDiff;
    +			switch(statusReport){
    +				case RUNNING:
    +					// Vincent, we happy?
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case FINISHED:
    +					// Yeah! We happy!
    +					this.currentJobCreatedAt = -1;
    +					break;
    +				case CREATED:
    +					// we're still at Job CREATED. Let's see if we're over the limit.
    +					timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +					if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +						failWithTimeout(timeDiff);
    +					} // otherwise just wait a bit longer.
    +					break;
    +				case RESTARTING:
    +					if(this.currentJobCreatedAt == -1){
    +						// we effectively have re-created the job
    +						this.currentJobCreatedAt = System.currentTimeMillis();
    +					} else{
    +						// it was already at either CREATED or RESTARTING. See if we're over the limit.
    +						timeDiff = (System.currentTimeMillis() - this.currentJobCreatedAt);
    +						if(timeDiff > JOB_CLIENT_JOB_STATUS_TIMEOUT){
    +							failWithTimeout(timeDiff);
    +						} // otherwise let's wait a bit more.
    +					}
    +					break;
    +				default:
    +					// well, canceled or failed. We'll find out with an exact message in a while.
    --- End diff --
    
    The job might get stuck for some reason in this state but the `JobManager` is still alive. Then we also want the `JobClient` to stop waiting eventually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---