You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/05/17 14:43:39 UTC

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/6035

    [FLINK-6160] Add reconnection attempts in case of heartbeat timeouts to JobMaster and TaskExecutor

    ## What is the purpose of the change
    
    If a timeout with the RM occurs on on the JobMaster and TaskExecutor, then they will both try to reconnect
    to the last known RM address.
    
    Additionally, we now respect the TaskManagerOption#REGISTRATION_TIMEOUT on the TaskExecutor. This means that
    if the TaskExecutor could not register at a RM within the given registration timeout, it will fail with a
    fatal exception. This allows to fail the TaskExecutor process in case that it cannot establish a connection
    and ultimately frees the occupied resources.
    
    The commit also changes the default value for TaskManagerOption#REGISTRATION_TIMEOUT from "Inf" to "5 min".
    
    cc @GJL.
    
    ## Brief change log
    
    - Retry connection to RM in case of heartbeat timeout on `JobMaster` and `TaskExecutor`
    - Fail `TaskExecutor` if we could not connect to `RM` within `TaskManagerOptions#REGISTRATION_TIMEOUT`
    
    ## Verifying this change
    
    - Adapted `JobMasterTest#testHeartbeatTimeoutWithResourceManager`
    - Adapted `TaskExecutorTest#testHeartbeatTimeoutWithResourceManager`
    - Added `TaskExecutorTest#testMaximumRegistrationDuration` and `TaskExecutorTest#testMaximumRegistrationDurationAfterConnectionLoss`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixReconnection

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6035.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6035
    
----
commit 6b45c84cf06688099e71c9e1809917653af43d31
Author: Till Rohrmann <tr...@...>
Date:   2018-05-17T12:44:14Z

    [FLINK-6160] Add reconnection attempts in case of heartbeat timeouts to JobMaster and TaskExecutor
    
    If a timeout with the RM occurs on on the JobMaster and TaskExecutor, then they will both try to reconnect
    to the last known RM address.
    
    Additionally, we now respect the TaskManagerOption#REGISTRATION_TIMEOUT on the TaskExecutor. This means that
    if the TaskExecutor could not register at a RM within the given registration timeout, it will fail with a
    fatal exception. This allows to fail the TaskExecutor process in case that it cannot establish a connection
    and ultimately frees the occupied resources.
    
    The commit also changes the default value for TaskManagerOption#REGISTRATION_TIMEOUT from "Inf" to "5 min".

----


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6035#discussion_r189004536
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -1605,9 +1609,16 @@ public void notifyHeartbeatTimeout(final ResourceID resourceId) {
     			runAsync(() -> {
     				log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
     
    -				closeResourceManagerConnection(
    -					new TimeoutException(
    -						"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
    +				if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
    +					final String resourceManagerAddress = establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
    --- End diff --
    
    Declaration and assignment can be moved closer to `createResourceManagerConnection`.


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6035#discussion_r189013436
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception cause) {
     			resourceManagerConnection.close();
     			resourceManagerConnection = null;
     		}
    +
    +		startRegistrationTimeout();
    --- End diff --
    
    It looks weird that we call `startRegistrationTimeout();` in `closeResourceManagerConnection`. Can it be done in `createResourceManagerConnection`


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6035#discussion_r189012246
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception cause) {
     			resourceManagerConnection.close();
     			resourceManagerConnection = null;
     		}
    +
    +		startRegistrationTimeout();
    +	}
    +
    +	private void startRegistrationTimeout() {
    +		final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration();
    +
    +		if (maxRegistrationDuration != null) {
    +			final UUID newRegistrationTimeoutId = UUID.randomUUID();
    +			currentRegistrationTimeoutId = newRegistrationTimeoutId;
    +			scheduleRunAsync(() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
    +		}
    +	}
    +
    +	private void stopRegistrationTimeout() {
    +		currentRegistrationTimeoutId = null;
    +	}
    +
    +	private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
    +		if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) {
    +			final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration();
    +
    +			onFatalError(
    +				new RegistrationTimeoutException(
    +					String.format("Could not register at the ResourceManager within the specified maximum " +
    +						"registration duration %s. This indicates a problem with this instance. Terminating now.",
    +						maxRegistrationDuration)));
    +		} else {
    +			log.debug("Ignoring outdated registration timeout.");
    --- End diff --
    
    I think this will be logged even if the registration succeeded.


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6035#discussion_r189019783
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -1605,9 +1609,16 @@ public void notifyHeartbeatTimeout(final ResourceID resourceId) {
     			runAsync(() -> {
     				log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
     
    -				closeResourceManagerConnection(
    -					new TimeoutException(
    -						"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
    +				if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
    +					final String resourceManagerAddress = establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
    --- End diff --
    
    Actually not, because we set `establishedResourcemanagerConnection` to `null` in the close method.


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6035#discussion_r189017088
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception cause) {
     			resourceManagerConnection.close();
     			resourceManagerConnection = null;
     		}
    +
    +		startRegistrationTimeout();
    --- End diff --
    
    The problem is that we want this timeout to start whenever the `TaskExecutor` loses its connection to the RM and that's when we close the RM connection. This also covers the case, where we don't know the RM address (e.g. if the RM loses leadership).


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6035


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6035#discussion_r189016603
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -232,8 +239,10 @@ public TaskExecutor(
     			rpcService.getScheduledExecutor(),
     			log);
     
    -		hardwareDescription = HardwareDescription.extractFromSystem(
    +		this.hardwareDescription = HardwareDescription.extractFromSystem(
     			taskExecutorServices.getMemoryManager().getMemorySize());
    +
    +		this.currentRegistrationTimeoutId = null;
    --- End diff --
    
    Will change it.


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6035#discussion_r189004280
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java ---
    @@ -50,8 +50,11 @@
     	private final String[] tmpDirectories;
     
     	private final Time timeout;
    +
     	// null indicates an infinite duration
    +	@Nullable
    --- End diff --
    
    Should also be `@Nullable` on the constructor.


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6035#discussion_r189016636
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java ---
    @@ -50,8 +50,11 @@
     	private final String[] tmpDirectories;
     
     	private final Time timeout;
    +
     	// null indicates an infinite duration
    +	@Nullable
    --- End diff --
    
    Will change it.


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6035#discussion_r189016685
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -1605,9 +1609,16 @@ public void notifyHeartbeatTimeout(final ResourceID resourceId) {
     			runAsync(() -> {
     				log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
     
    -				closeResourceManagerConnection(
    -					new TimeoutException(
    -						"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
    +				if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
    +					final String resourceManagerAddress = establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
    --- End diff --
    
    True, will change it.


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6035#discussion_r189019837
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception cause) {
     			resourceManagerConnection.close();
     			resourceManagerConnection = null;
     		}
    +
    +		startRegistrationTimeout();
    +	}
    +
    +	private void startRegistrationTimeout() {
    +		final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration();
    +
    +		if (maxRegistrationDuration != null) {
    +			final UUID newRegistrationTimeoutId = UUID.randomUUID();
    +			currentRegistrationTimeoutId = newRegistrationTimeoutId;
    +			scheduleRunAsync(() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
    +		}
    +	}
    +
    +	private void stopRegistrationTimeout() {
    +		currentRegistrationTimeoutId = null;
    +	}
    +
    +	private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
    +		if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) {
    +			final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration();
    +
    +			onFatalError(
    +				new RegistrationTimeoutException(
    +					String.format("Could not register at the ResourceManager within the specified maximum " +
    +						"registration duration %s. This indicates a problem with this instance. Terminating now.",
    +						maxRegistrationDuration)));
    +		} else {
    +			log.debug("Ignoring outdated registration timeout.");
    --- End diff --
    
    True, will remove it.


---

[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6035#discussion_r188992099
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -232,8 +239,10 @@ public TaskExecutor(
     			rpcService.getScheduledExecutor(),
     			log);
     
    -		hardwareDescription = HardwareDescription.extractFromSystem(
    +		this.hardwareDescription = HardwareDescription.extractFromSystem(
     			taskExecutorServices.getMemoryManager().getMemorySize());
    +
    +		this.currentRegistrationTimeoutId = null;
    --- End diff --
    
    It's already `null` by default.


---