You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by KurtYoung <gi...@git.apache.org> on 2016/09/26 15:41:19 UTC

[GitHub] flink pull request #2550: [Flink-4657] Implement HighAvailabilityServices ba...

GitHub user KurtYoung opened a pull request:

    https://github.com/apache/flink/pull/2550

    [Flink-4657] Implement HighAvailabilityServices based on zookeeper

    This actually contains 3 commits. More details can be found here: https://github.com/StephanEwen/incubator-flink/pull/15

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/KurtYoung/flink flink-4657

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2550.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2550
    
----
commit b1e9db28d55ef53c58d17bd0fa73da05a1b6570a
Author: Kurt Young <yk...@gmail.com>
Date:   2016-09-26T02:59:16Z

    [FLINK-4657] Add contains() to submitted job graph store, to indicate whether a job needs to be run.

commit b0c88a04b0461a84a05aaba1d649b59a7e13fe7b
Author: Kurt Young <yk...@gmail.com>
Date:   2016-09-22T01:07:13Z

    [FLINK-4657] Implement HighAvailabilityServices based on zookeeper

commit b8801720c044cba10562ae8cc5b2770cb21cf07a
Author: Kurt Young <yk...@gmail.com>
Date:   2016-09-26T15:36:04Z

    [FLINK-4657] Implement a few rpc calls for JobMaster

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80834516
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    +		final byte[] serializedInputSplit;
    +
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
    +		if (execution == null) {
    +			log.error("Can not find Execution for attempt {}.", executionAttempt);
    +			return null;
    +		} else {
    +			final Slot slot = execution.getAssignedResource();
    +			final int taskId = execution.getVertex().getParallelSubtaskIndex();
    +			final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
    +
    +			final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
    +			if (vertex != null) {
    +				final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
    +				if (splitAssigner != null) {
    +					final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
    +
    +					log.debug("Send next input split {}.", nextInputSplit);
    +					try {
    +						serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
    +					} catch (Exception ex) {
    +						log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
    +						vertex.fail(new RuntimeException("Could not serialize the next input split of class " +
    +							nextInputSplit.getClass() + ".", ex));
    +						return null;
    +					}
    +				} else {
    +					log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
    +					return null;
    +				}
    +			} else {
    +				log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
    +				return null;
    +			}
    +		}
    +		return new NextInputSplit(serializedInputSplit);
    +	}
    +
    +	@RpcMethod
    +	public PartitionState requestPartitionState(
    +		final ResultPartitionID partitionId,
    +		final ExecutionAttemptID taskExecutionId,
    +		final IntermediateDataSetID taskResultId)
    +	{
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
    +		final ExecutionState state = execution != null ? execution.getState() : null;
    +		return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state);
    +	}
    +
    +	@RpcMethod
    +	public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
    +		final JobID jobID = executionGraph.getJobID();
    +		final String jobName = executionGraph.getJobName();
    +		log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error);
    +
    +		if (newJobStatus.isGloballyTerminalState()) {
    +			// TODO set job end time in JobInfo
    +
    +			/*
    +			  TODO
    +			  if (jobInfo.sessionAlive) {
    +                jobInfo.setLastActive()
    +                val lastActivity = jobInfo.lastActive
    +                context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
    +                  // remove only if no activity occurred in the meantime
    +                  if (lastActivity == jobInfo.lastActive) {
    +                    self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
    +                  }
    +                }(context.dispatcher)
    +              } else {
    +                self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
    +              }
    +			 */
    +
    +			if (newJobStatus == JobStatus.FINISHED) {
    +				try {
    +					final Map<String, SerializedValue<Object>> accumulatorResults =
    +						executionGraph.getAccumulatorsSerialized();
    +					final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
    +						jobID, 0, accumulatorResults // TODO get correct job duration
    +					);
    +					jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
    +				} catch (Exception e) {
    +					log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
    +					final JobExecutionException exception = new JobExecutionException(
    +						jobID, "Failed to retrieve accumulator results.", e);
    +					// TODO should we also notify client?
    +					jobCompletionActions.jobFailed(exception);
    +				}
    +			}
    +			else if (newJobStatus == JobStatus.CANCELED) {
    +				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
    +				final JobExecutionException exception = new JobExecutionException(
    +					jobID, "Job was cancelled.", unpackedError);
    +				// TODO should we also notify client?
    +				jobCompletionActions.jobFailed(exception);
    +			}
    +			else if (newJobStatus == JobStatus.FAILED) {
    +				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
    +				final JobExecutionException exception = new JobExecutionException(
    +					jobID, "Job execution failed.", unpackedError);
    +				// TODO should we also notify client?
    +				jobCompletionActions.jobFailed(exception);
    +			}
    +			else {
    +				final JobExecutionException exception = new JobExecutionException(
    +					jobID, newJobStatus + " is not a terminal state.");
    +				// TODO should we also notify client?
    +				jobCompletionActions.jobFailed(exception);
    +				throw new RuntimeException(exception);
    +			}
    +		}
    +	}
    +
    +	@RpcMethod
    +	public void scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
    --- End diff --
    
    The ResultPartition that runs in TaskManager will call this through "ActorGatewayResultPartitionConsumableNotifier" currently. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2550: [FLINK-4657] Implement HighAvailabilityServices based on ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2550
  
    @KurtYoung Yes, the `cluster-id` should lake the role of `availability.zookeeper.path.namespace` - in some sense it is a renaming.
    
    I would also suggest we use existing IDs where possible - for example the Yarn application ID. That will make it easier for users who debug to correlate the ZooKeeper entries with Flink jobs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2550: [FLINK-4657] Implement HighAvailabilityServices based on ...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on the issue:

    https://github.com/apache/flink/pull/2550
  
    For "cluster-id" thing, actually there is some similar functionality in current Flink, configed by "high-availability.zookeeper.path.namespace". It will work with another config "high-availability.zookeeper.path.root" which is "/flink" by default. 
    The real root in zk will be "/flink/some_namespace", we can change the namespace when we have a new cluster. Maybe we should do it explicitly in Cli or some shell script. Do you think that will work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80835462
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    +		final byte[] serializedInputSplit;
    +
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
    +		if (execution == null) {
    +			log.error("Can not find Execution for attempt {}.", executionAttempt);
    +			return null;
    +		} else {
    +			final Slot slot = execution.getAssignedResource();
    +			final int taskId = execution.getVertex().getParallelSubtaskIndex();
    +			final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
    +
    +			final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
    +			if (vertex != null) {
    +				final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
    +				if (splitAssigner != null) {
    +					final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
    +
    +					log.debug("Send next input split {}.", nextInputSplit);
    +					try {
    +						serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
    +					} catch (Exception ex) {
    +						log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
    +						vertex.fail(new RuntimeException("Could not serialize the next input split of class " +
    +							nextInputSplit.getClass() + ".", ex));
    +						return null;
    +					}
    +				} else {
    +					log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
    +					return null;
    +				}
    +			} else {
    +				log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
    +				return null;
    +			}
    +		}
    +		return new NextInputSplit(serializedInputSplit);
    +	}
    +
    +	@RpcMethod
    +	public PartitionState requestPartitionState(
    +		final ResultPartitionID partitionId,
    +		final ExecutionAttemptID taskExecutionId,
    +		final IntermediateDataSetID taskResultId)
    +	{
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
    +		final ExecutionState state = execution != null ? execution.getState() : null;
    +		return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state);
    +	}
    +
    +	@RpcMethod
    +	public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
    +		final JobID jobID = executionGraph.getJobID();
    +		final String jobName = executionGraph.getJobName();
    +		log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error);
    +
    +		if (newJobStatus.isGloballyTerminalState()) {
    +			// TODO set job end time in JobInfo
    +
    +			/*
    +			  TODO
    +			  if (jobInfo.sessionAlive) {
    +                jobInfo.setLastActive()
    +                val lastActivity = jobInfo.lastActive
    +                context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
    +                  // remove only if no activity occurred in the meantime
    +                  if (lastActivity == jobInfo.lastActive) {
    +                    self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
    +                  }
    +                }(context.dispatcher)
    +              } else {
    +                self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
    +              }
    +			 */
    +
    +			if (newJobStatus == JobStatus.FINISHED) {
    +				try {
    +					final Map<String, SerializedValue<Object>> accumulatorResults =
    +						executionGraph.getAccumulatorsSerialized();
    +					final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
    +						jobID, 0, accumulatorResults // TODO get correct job duration
    +					);
    +					jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
    +				} catch (Exception e) {
    +					log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
    +					final JobExecutionException exception = new JobExecutionException(
    +						jobID, "Failed to retrieve accumulator results.", e);
    +					// TODO should we also notify client?
    +					jobCompletionActions.jobFailed(exception);
    +				}
    +			}
    +			else if (newJobStatus == JobStatus.CANCELED) {
    +				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
    --- End diff --
    
    Currently, i think we should let outsider know this job has reached a terminal state, so that they can do some garbage collection work, like dispose JobManagerRunner. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80835559
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    +		final byte[] serializedInputSplit;
    +
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
    +		if (execution == null) {
    +			log.error("Can not find Execution for attempt {}.", executionAttempt);
    +			return null;
    +		} else {
    +			final Slot slot = execution.getAssignedResource();
    +			final int taskId = execution.getVertex().getParallelSubtaskIndex();
    +			final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
    +
    +			final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
    +			if (vertex != null) {
    +				final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
    +				if (splitAssigner != null) {
    +					final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
    +
    +					log.debug("Send next input split {}.", nextInputSplit);
    +					try {
    +						serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
    +					} catch (Exception ex) {
    +						log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
    +						vertex.fail(new RuntimeException("Could not serialize the next input split of class " +
    +							nextInputSplit.getClass() + ".", ex));
    +						return null;
    +					}
    +				} else {
    +					log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
    +					return null;
    +				}
    +			} else {
    +				log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
    +				return null;
    +			}
    +		}
    +		return new NextInputSplit(serializedInputSplit);
    +	}
    +
    +	@RpcMethod
    +	public PartitionState requestPartitionState(
    +		final ResultPartitionID partitionId,
    +		final ExecutionAttemptID taskExecutionId,
    +		final IntermediateDataSetID taskResultId)
    +	{
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
    +		final ExecutionState state = execution != null ? execution.getState() : null;
    +		return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state);
    +	}
    +
    +	@RpcMethod
    +	public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
    --- End diff --
    
    yes, you're right, will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2550: [FLINK-4657] Implement HighAvailabilityServices based on ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2550
  
    Thanks for your contribution @KurtYoung. I've merged the code. You can close the PR now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80734121
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    +		final byte[] serializedInputSplit;
    +
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
    +		if (execution == null) {
    +			log.error("Can not find Execution for attempt {}.", executionAttempt);
    +			return null;
    --- End diff --
    
    Let's let this throw an exception - after all, the caller receiver should react to the fact that the JobManager is not aware of its execution any more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80740850
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    +		final byte[] serializedInputSplit;
    +
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
    +		if (execution == null) {
    +			log.error("Can not find Execution for attempt {}.", executionAttempt);
    +			return null;
    +		} else {
    +			final Slot slot = execution.getAssignedResource();
    +			final int taskId = execution.getVertex().getParallelSubtaskIndex();
    +			final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
    +
    +			final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
    +			if (vertex != null) {
    +				final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
    +				if (splitAssigner != null) {
    +					final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
    +
    +					log.debug("Send next input split {}.", nextInputSplit);
    +					try {
    +						serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
    +					} catch (Exception ex) {
    +						log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
    +						vertex.fail(new RuntimeException("Could not serialize the next input split of class " +
    +							nextInputSplit.getClass() + ".", ex));
    +						return null;
    +					}
    +				} else {
    +					log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
    +					return null;
    +				}
    +			} else {
    +				log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
    +				return null;
    +			}
    +		}
    +		return new NextInputSplit(serializedInputSplit);
    +	}
    +
    +	@RpcMethod
    +	public PartitionState requestPartitionState(
    +		final ResultPartitionID partitionId,
    +		final ExecutionAttemptID taskExecutionId,
    +		final IntermediateDataSetID taskResultId)
    +	{
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
    +		final ExecutionState state = execution != null ? execution.getState() : null;
    +		return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state);
    +	}
    +
    +	@RpcMethod
    +	public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
    +		final JobID jobID = executionGraph.getJobID();
    +		final String jobName = executionGraph.getJobName();
    +		log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error);
    +
    +		if (newJobStatus.isGloballyTerminalState()) {
    +			// TODO set job end time in JobInfo
    +
    +			/*
    +			  TODO
    +			  if (jobInfo.sessionAlive) {
    +                jobInfo.setLastActive()
    +                val lastActivity = jobInfo.lastActive
    +                context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
    +                  // remove only if no activity occurred in the meantime
    +                  if (lastActivity == jobInfo.lastActive) {
    +                    self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
    +                  }
    +                }(context.dispatcher)
    +              } else {
    +                self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
    +              }
    +			 */
    +
    +			if (newJobStatus == JobStatus.FINISHED) {
    +				try {
    +					final Map<String, SerializedValue<Object>> accumulatorResults =
    +						executionGraph.getAccumulatorsSerialized();
    +					final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
    +						jobID, 0, accumulatorResults // TODO get correct job duration
    +					);
    +					jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
    +				} catch (Exception e) {
    +					log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
    +					final JobExecutionException exception = new JobExecutionException(
    +						jobID, "Failed to retrieve accumulator results.", e);
    +					// TODO should we also notify client?
    +					jobCompletionActions.jobFailed(exception);
    +				}
    +			}
    +			else if (newJobStatus == JobStatus.CANCELED) {
    +				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
    +				final JobExecutionException exception = new JobExecutionException(
    +					jobID, "Job was cancelled.", unpackedError);
    +				// TODO should we also notify client?
    +				jobCompletionActions.jobFailed(exception);
    +			}
    +			else if (newJobStatus == JobStatus.FAILED) {
    +				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
    +				final JobExecutionException exception = new JobExecutionException(
    +					jobID, "Job execution failed.", unpackedError);
    +				// TODO should we also notify client?
    +				jobCompletionActions.jobFailed(exception);
    +			}
    +			else {
    +				final JobExecutionException exception = new JobExecutionException(
    +					jobID, newJobStatus + " is not a terminal state.");
    +				// TODO should we also notify client?
    +				jobCompletionActions.jobFailed(exception);
    +				throw new RuntimeException(exception);
    +			}
    +		}
    +	}
    +
    +	@RpcMethod
    +	public void scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
    --- End diff --
    
    Is that method needed as an RPC method? Who would call it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80874290
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    --- End diff --
    
    Thanks for your comments @tillrohrmann . It all sounds reasonable to me. Limit the rpc error within one specified Exception is a good idea, thus user can easily know what went wrong. 
    One minor suggestion, may be we should create a base class `RpcException`, and make something like `RpcConnectionException`, `RpcExecutionException` and `RpcTimeoutException` to inherit from that, to make a better more clear. This can be done in another jira, though. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80837572
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    --- End diff --
    
    I'm not sure that throw a exception by RpcMethod is a good way to do error handling. From the caller's side, when the rpc method returns the Future from gateway, caller can do error handling with handleAsync or exceptionallyAsync now. But the exception from user logic with mess with all the exceptions from rpc framework, like RpcTimeout or other exception that tells you that maybe the rpc system does not work well. So typically you need to try to figure out what went wrong by distinguishing the Exception type, which is not very elegant i think. 
    One way we can do is we never throw exception in RpcMethod but deal with error in the "ErrorCode" way by return the error explicitly with return value. All the exception thrown when doing rpc call should due to the rpc framework. 
    In this situation, returning null is indicating that something wrong with requesting. (If we should know more detail about error, we can rich it by returning message, currently null will do the work)
    And in normal case like no further split, we still return a NextInputSplit with empty content in it. 
    What do you think about all these, let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2550: [FLINK-4657] Implement HighAvailabilityServices based on ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2550
  
    Can we try and unify the structure / paths under which all this information is stored in ZooKeeper?
    
    ```
    /flink
         +/cluster_id_1/resource_manager_lock
         |            |
         |            +/job-id-1/job_manager_lock
         |            |         /checkpoints/latest
         |            |                     /latest-1
         |            |                     /latest-2
         |            |
         |            +/job-id-2/job_manager_lock
         |      
         +/cluster_id_2/resource_manager_lock
                      |
                      +/job-id-1/job_manager_lock
                               |/checkpoints/latest
                               |            /latest-1
                               |/persisted_job_graph
    ```
    The "cluster-id" should be a generated UUID in the case of YARN/Mesos, and should be a config value in the standalone case. In Yarn / Mesos, the UUID should be passed via an environment variable to the Java processes with the entry points for TaskManager / JobManager / ResourceManager.
    
    In the Constructor, the ZooKeeper HA Services should get the "cluster-id".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80740727
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    +		final byte[] serializedInputSplit;
    +
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
    +		if (execution == null) {
    +			log.error("Can not find Execution for attempt {}.", executionAttempt);
    +			return null;
    +		} else {
    +			final Slot slot = execution.getAssignedResource();
    +			final int taskId = execution.getVertex().getParallelSubtaskIndex();
    +			final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
    +
    +			final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
    +			if (vertex != null) {
    +				final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
    +				if (splitAssigner != null) {
    +					final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
    +
    +					log.debug("Send next input split {}.", nextInputSplit);
    +					try {
    +						serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
    +					} catch (Exception ex) {
    +						log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
    +						vertex.fail(new RuntimeException("Could not serialize the next input split of class " +
    +							nextInputSplit.getClass() + ".", ex));
    +						return null;
    +					}
    +				} else {
    +					log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
    +					return null;
    +				}
    +			} else {
    +				log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
    +				return null;
    +			}
    +		}
    +		return new NextInputSplit(serializedInputSplit);
    +	}
    +
    +	@RpcMethod
    +	public PartitionState requestPartitionState(
    +		final ResultPartitionID partitionId,
    +		final ExecutionAttemptID taskExecutionId,
    +		final IntermediateDataSetID taskResultId)
    +	{
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
    +		final ExecutionState state = execution != null ? execution.getState() : null;
    +		return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state);
    +	}
    +
    +	@RpcMethod
    +	public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
    +		final JobID jobID = executionGraph.getJobID();
    +		final String jobName = executionGraph.getJobName();
    +		log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error);
    +
    +		if (newJobStatus.isGloballyTerminalState()) {
    +			// TODO set job end time in JobInfo
    +
    +			/*
    +			  TODO
    +			  if (jobInfo.sessionAlive) {
    +                jobInfo.setLastActive()
    +                val lastActivity = jobInfo.lastActive
    +                context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
    +                  // remove only if no activity occurred in the meantime
    +                  if (lastActivity == jobInfo.lastActive) {
    +                    self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
    +                  }
    +                }(context.dispatcher)
    +              } else {
    +                self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
    +              }
    +			 */
    +
    +			if (newJobStatus == JobStatus.FINISHED) {
    +				try {
    +					final Map<String, SerializedValue<Object>> accumulatorResults =
    +						executionGraph.getAccumulatorsSerialized();
    +					final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
    +						jobID, 0, accumulatorResults // TODO get correct job duration
    +					);
    +					jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
    +				} catch (Exception e) {
    +					log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
    +					final JobExecutionException exception = new JobExecutionException(
    +						jobID, "Failed to retrieve accumulator results.", e);
    +					// TODO should we also notify client?
    +					jobCompletionActions.jobFailed(exception);
    +				}
    +			}
    +			else if (newJobStatus == JobStatus.CANCELED) {
    +				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
    --- End diff --
    
    On "Canceled", we probably should not report and lingering exceptions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80733924
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    +		final byte[] serializedInputSplit;
    +
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
    +		if (execution == null) {
    +			log.error("Can not find Execution for attempt {}.", executionAttempt);
    --- End diff --
    
    The log could be on "debug" level, as this situation can occur when the JobManager failed tasks and unregistered them, but the cancel() messages have not reached the TaskManager.
    
    Since this is a valid situation, it should not cause a "WARN" message. In general, all situations that are valid race conditions and that need not indicate a corrupted state or lead to a failure / recovery should probably not have a "WARN" level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80742192
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    +		final byte[] serializedInputSplit;
    +
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
    +		if (execution == null) {
    +			log.error("Can not find Execution for attempt {}.", executionAttempt);
    +			return null;
    +		} else {
    +			final Slot slot = execution.getAssignedResource();
    +			final int taskId = execution.getVertex().getParallelSubtaskIndex();
    +			final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
    +
    +			final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
    +			if (vertex != null) {
    +				final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
    +				if (splitAssigner != null) {
    +					final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
    +
    +					log.debug("Send next input split {}.", nextInputSplit);
    +					try {
    +						serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
    +					} catch (Exception ex) {
    +						log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
    +						vertex.fail(new RuntimeException("Could not serialize the next input split of class " +
    +							nextInputSplit.getClass() + ".", ex));
    +						return null;
    +					}
    +				} else {
    +					log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
    +					return null;
    +				}
    +			} else {
    +				log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
    +				return null;
    +			}
    +		}
    +		return new NextInputSplit(serializedInputSplit);
    +	}
    +
    +	@RpcMethod
    +	public PartitionState requestPartitionState(
    +		final ResultPartitionID partitionId,
    +		final ExecutionAttemptID taskExecutionId,
    +		final IntermediateDataSetID taskResultId)
    +	{
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
    +		final ExecutionState state = execution != null ? execution.getState() : null;
    +		return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state);
    +	}
    +
    +	@RpcMethod
    +	public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
    --- End diff --
    
    Is there a way we can get this method out of the public interface? It could do state changes in `callAsync()`. That way, it would not be callable via RPC, because it seems like this is no method anyone should invoke remotely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80835773
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    +		final byte[] serializedInputSplit;
    +
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
    +		if (execution == null) {
    +			log.error("Can not find Execution for attempt {}.", executionAttempt);
    --- End diff --
    
    ok, will change to "debug" and leave some comments about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80869838
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    --- End diff --
    
    I think both approaches, sending an exception or sending a failure response message in case of an exceptionally state on the receiving side, are fine. 
    However, I don't agree that exceptions are only reserved for the rpc layer. "User" code should also be allowed to throw exceptions. I agree, though, that we have to pay special attention to these cases on the caller-side, because there is no compile time check that we actually handle all occurring exceptions.
    All rpc related exceptions are wrapped in a `RpcConnectionException` and should, thus, be distinguishable from the "user" code exceptions.
    In this specific case, I would go with throwing exceptions, because they transmit more information about the failure reason compared to simply sending `null` to the caller.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung closed the pull request at:

    https://github.com/apache/flink/pull/2550


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2550: [FLINK-4657] Implement HighAvailabilityServices ba...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80733402
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) {
    --- End diff --
    
    Let's let this method throw an exception. Then the calling TaskManager can see the difference between `null` (= no further split) and `Exception` (= something went wrong / is inconsistent).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---