You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by beyond1920 <gi...@git.apache.org> on 2016/08/19 09:52:30 UTC

[GitHub] flink pull request #2389: Jira FLINK-4348

GitHub user beyond1920 opened a pull request:

    https://github.com/apache/flink/pull/2389

    Jira FLINK-4348

    this pull request aims to implement communication from ResourceManager to TaskManager which is jira 4348.
    There are mainly 3 logics initiated from RM to TM:
    1. Heartbeat, RM use heartbeat to sync with TM's slot status
    2. request slot, when RM decides to assign slot to JM, should first try to send request to TM for slot. TM can either accept or reject this request.
    3. FailureNotify, if RM cannot keep contact with heartbeat for several times, it will mark TM failed. Besides in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize. RM should send failure notify to TM and TM can terminate itself

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/alibaba/flink jira-4345

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2389.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2389
    
----
commit 937c57c93894d271a614131cc77f0a8a7e33ab37
Author: beyond1920 <be...@126.com>
Date:   2016-08-15T08:05:45Z

    from stephon's uncommitted pull request
    	modified:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
    	new file:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/TaskExecutorRegistrationResponse.java
    
    	modified:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
    	new file:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/TaskExecutorRegistrationResponse.java
    	new file:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java

commit 073b65c35fed4e64faedaa653b7b01b532531990
Author: beyond1920 <be...@126.com>
Date:   2016-08-15T08:34:34Z

    request slot from slotManager and offer slot to slotManager
    	new file:   flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationJobID.java
    	new file:   flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
    	modified:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
    	new file:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotManager.java
    	modified:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
    	new file:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/StandaloneSlotManager.java
    	new file:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/YarnSlotManager.java
    	new file:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/RequestSlotResponse.java
    	modified:   flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java

commit 5e50d0844f9b04f2bf89a5fc47cd0248d624ff6f
Author: beyond1920 <be...@126.com>
Date:   2016-08-15T10:32:03Z

    hearbeat response from tm to rm

commit ecf14e42a4626725f6a7aba28af4b2d0d83c2f18
Author: beyond1920 <be...@126.com>
Date:   2016-08-16T08:22:20Z

    update the heartbeat api and response message

commit b815134933978620420874642df0e629ce35f8e6
Author: beyond1920 <be...@126.com>
Date:   2016-08-17T02:13:40Z

    Merge branch 'flip-6' of https://github.com/apache/flink into jira-4345

commit b55f7843af50e2a4ad85dbfdbb1fedc5985f1ff3
Author: beyond1920 <be...@126.com>
Date:   2016-08-17T02:29:39Z

    add heartbeat manager

commit c838912c36381ee13c4c6871e6ea76092ee6e4fa
Author: beyond1920 <be...@126.com>
Date:   2016-08-17T08:45:43Z

    update resourceManager and add LeaderContender subclass

commit 196ac2bb5943b58bb12aceb214fa4168aafed29f
Author: beyond1920 <be...@126.com>
Date:   2016-08-19T09:14:45Z

    add test to ResourceManagerToTaskExecutorHeartbeatScheduler

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75670775
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatScheduler.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * heartbeat between ResourceManager and TaskManager, it is responsible for schedule heartbeat and handle
    + * heartbeat lost cases
    + */
    +public class ResourceManagerToTaskExecutorHeartbeatScheduler {
    +
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** max heartbeat interval time in millisecond (which is used in retry heartbeat case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** if a failure except for timeout exception happened when trigger heartbeat from resourceManager to taskManager , next attemp will start after  ERROR_HEARTBEAT_DELAY_MILLIS millisecond */
    +	private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
    +
    +	/** max heartbeat retry times when lost heartbeat */
    +	private static final int MAX_ATTEMPT_TIMES = 8;
    +
    +	private final long heartbeatInterval;
    +
    +	private final long heartbeatTimeout;
    +
    +	private final long maxHeartbeatTimeout;
    +
    +	private final long delayOnError;
    +
    +	private final int maxAttempt;
    +
    +
    +	/** taskManagerGateway to receive the heartbeat and report slot allocation */
    +	private final TaskExecutorGateway taskExecutorGateway;
    +
    +	/** the taskManager address */
    +	private final String taskExecutorAddress;
    +
    +	/** identify the taskManager resourceID */
    +	private final ResourceID resourceID;
    +
    +	/** identify the resourceManager rpc endpoint */
    +	private final ResourceManager resourceManager;
    +
    +	private final UUID resourceManagerLeaderSessionID;
    +
    +	private final Logger log;
    +
    +	private volatile boolean closed;
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager         resourceManager which handles heartbeat communication with taskManager
    +	 * @param taskExecutorGateway     taskManager which receives heartbeat from resourceManager and report its slot
    +	 *                                allocation to resourceManager
    +	 * @param taskExecutorAddress     taskManager's address
    +	 * @param taskExecutorResourceID  taskManager's resourceID
    +	 * @param log
    +	 */
    +	public ResourceManagerToTaskExecutorHeartbeatScheduler(
    +		ResourceManager resourceManager, UUID resourceManagerLeaderSessionID, TaskExecutorGateway taskExecutorGateway,
    +		String taskExecutorAddress, ResourceID taskExecutorResourceID, Logger log) {
    +		this(resourceManager, resourceManagerLeaderSessionID, taskExecutorGateway, taskExecutorAddress, taskExecutorResourceID,
    +			log, INITIAL_HEARTBEAT_INTERVAL_MILLIS, INITIAL_HEARTBEAT_TIMEOUT_MILLIS, MAX_HEARTBEAT_TIMEOUT_MILLIS,
    +			ERROR_HEARTBEAT_DELAY_MILLIS, MAX_ATTEMPT_TIMES);
    +	}
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager
    +	 * @param taskExecutorGateway
    +	 * @param taskExecutorAddress
    +	 * @param taskExecutorResourceID
    +	 * @param log
    +	 * @param heartbeatInterval
    +	 * @param heartbeatTimeout
    +	 * @param maxHeartbeatTimeout
    +	 * @param delayOnError
    +	 * @param maxAttempt
    +	 */
    +	public ResourceManagerToTaskExecutorHeartbeatScheduler(
    +		ResourceManager resourceManager, UUID resourceManagerLeaderSessionID, TaskExecutorGateway taskExecutorGateway,
    +		String taskExecutorAddress, ResourceID taskExecutorResourceID, Logger log, long heartbeatInterval,
    +		long heartbeatTimeout, long maxHeartbeatTimeout, long delayOnError, int maxAttempt) {
    +		this.resourceManager = resourceManager;
    +		this.resourceManagerLeaderSessionID = resourceManagerLeaderSessionID;
    +		this.taskExecutorGateway = taskExecutorGateway;
    +		this.taskExecutorAddress = taskExecutorAddress;
    +		this.resourceID = taskExecutorResourceID;
    +		this.log = log;
    +		this.heartbeatInterval = heartbeatInterval;
    +		this.heartbeatTimeout = heartbeatTimeout;
    +		this.maxHeartbeatTimeout = maxHeartbeatTimeout;
    +		this.delayOnError = delayOnError;
    +		this.maxAttempt = maxAttempt;
    --- End diff --
    
    Null checks and validation of arguments would be good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75789293
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
     
    -	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
    +	public ResourceManager(RpcService rpcService, ExecutorService executorService, LeaderElectionService leaderElectionService) {
     		super(rpcService);
     		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
     			Preconditions.checkNotNull(executorService));
     		this.jobMasterGateways = new HashMap<>();
    +		this.taskExecutorGateways = new HashMap<>();
    +		this.heartbeatSchedulers = new HashMap<>();
    +		this.leaderElectionService = leaderElectionService;
    +		leaderSessionID = null;
    +		// TODO this.slotManager = null;
    +	}
    +
    +	@Override
    +	public void start() {
    +		// start a leader
    +		try {
    +			leaderElectionService.start(new ResourceManagerLeaderContender());
    +			super.start();
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75792397
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatScheduler.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * heartbeat between ResourceManager and TaskManager, it is responsible for schedule heartbeat and handle
    + * heartbeat lost cases
    + */
    +public class ResourceManagerToTaskExecutorHeartbeatScheduler {
    +
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** max heartbeat interval time in millisecond (which is used in retry heartbeat case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** if a failure except for timeout exception happened when trigger heartbeat from resourceManager to taskManager , next attemp will start after  ERROR_HEARTBEAT_DELAY_MILLIS millisecond */
    +	private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
    +
    +	/** max heartbeat retry times when lost heartbeat */
    +	private static final int MAX_ATTEMPT_TIMES = 8;
    +
    +	private final long heartbeatInterval;
    +
    +	private final long heartbeatTimeout;
    +
    +	private final long maxHeartbeatTimeout;
    +
    +	private final long delayOnError;
    +
    +	private final int maxAttempt;
    +
    +
    +	/** taskManagerGateway to receive the heartbeat and report slot allocation */
    +	private final TaskExecutorGateway taskExecutorGateway;
    +
    +	/** the taskManager address */
    +	private final String taskExecutorAddress;
    +
    +	/** identify the taskManager resourceID */
    +	private final ResourceID resourceID;
    +
    +	/** identify the resourceManager rpc endpoint */
    +	private final ResourceManager resourceManager;
    +
    +	private final UUID resourceManagerLeaderSessionID;
    +
    +	private final Logger log;
    +
    +	private volatile boolean closed;
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager         resourceManager which handles heartbeat communication with taskManager
    +	 * @param taskExecutorGateway     taskManager which receives heartbeat from resourceManager and report its slot
    +	 *                                allocation to resourceManager
    +	 * @param taskExecutorAddress     taskManager's address
    +	 * @param taskExecutorResourceID  taskManager's resourceID
    +	 * @param log
    +	 */
    +	public ResourceManagerToTaskExecutorHeartbeatScheduler(
    +		ResourceManager resourceManager, UUID resourceManagerLeaderSessionID, TaskExecutorGateway taskExecutorGateway,
    +		String taskExecutorAddress, ResourceID taskExecutorResourceID, Logger log) {
    +		this(resourceManager, resourceManagerLeaderSessionID, taskExecutorGateway, taskExecutorAddress, taskExecutorResourceID,
    +			log, INITIAL_HEARTBEAT_INTERVAL_MILLIS, INITIAL_HEARTBEAT_TIMEOUT_MILLIS, MAX_HEARTBEAT_TIMEOUT_MILLIS,
    +			ERROR_HEARTBEAT_DELAY_MILLIS, MAX_ATTEMPT_TIMES);
    +	}
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager
    +	 * @param taskExecutorGateway
    +	 * @param taskExecutorAddress
    +	 * @param taskExecutorResourceID
    +	 * @param log
    +	 * @param heartbeatInterval
    +	 * @param heartbeatTimeout
    +	 * @param maxHeartbeatTimeout
    +	 * @param delayOnError
    +	 * @param maxAttempt
    +	 */
    +	public ResourceManagerToTaskExecutorHeartbeatScheduler(
    +		ResourceManager resourceManager, UUID resourceManagerLeaderSessionID, TaskExecutorGateway taskExecutorGateway,
    +		String taskExecutorAddress, ResourceID taskExecutorResourceID, Logger log, long heartbeatInterval,
    +		long heartbeatTimeout, long maxHeartbeatTimeout, long delayOnError, int maxAttempt) {
    +		this.resourceManager = resourceManager;
    +		this.resourceManagerLeaderSessionID = resourceManagerLeaderSessionID;
    +		this.taskExecutorGateway = taskExecutorGateway;
    +		this.taskExecutorAddress = taskExecutorAddress;
    +		this.resourceID = taskExecutorResourceID;
    +		this.log = log;
    +		this.heartbeatInterval = heartbeatInterval;
    +		this.heartbeatTimeout = heartbeatTimeout;
    +		this.maxHeartbeatTimeout = maxHeartbeatTimeout;
    +		this.delayOnError = delayOnError;
    +		this.maxAttempt = maxAttempt;
    +	}
    +
    +	/**
    +	 * start to schedule next heartbeat
    +	 */
    +	public void start() {
    +		checkState(!closed, "The heartbeat connection is already closed");
    +		sendHeartbeatToTaskManagerLater(1, heartbeatTimeout, heartbeatInterval);
    +	}
    +
    +	/**
    +	 * Checks if the heartbeat schedule was closed.
    +	 *
    +	 * @return True if the heartbeat schedule was closed, false otherwise.
    +	 */
    +	public boolean isClosed() {
    +		return closed;
    +	}
    +
    +	/**
    +	 * stop to schedule heartbeat
    +	 */
    +	public void close() {
    +		closed = true;
    +	}
    +
    +	/**
    +	 * get the heartbeat interval
    +	 *
    +	 * @return heartbeat interval
    +	 */
    +	public long getHeartbeatInterval() {
    +		return heartbeatInterval;
    +	}
    +
    +	/**
    +	 * send a heartbeat attempt to taskManager, receive the slotReport from TaskManager or failed depends on the future result.
    +	 *
    +	 * @param attempt
    +	 * @param timeoutMillis
    +	 */
    +	private void sendHeartbeatToTaskManager(final int attempt, final long timeoutMillis) {
    +		// eager check for closed to avoid some unnecessary work
    +		if (closed) {
    +			return;
    +		}
    +		FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS);
    +		Future<SlotReport> heartbeatResponse = taskExecutorGateway.triggerHeartbeatToResourceManager(resourceManagerLeaderSessionID, timeout);
    +
    +		heartbeatResponse.onSuccess(new OnSuccess<SlotReport>() {
    +
    +			@Override
    +			public void onSuccess(SlotReport result) throws Throwable {
    +				if (!isClosed()) {
    +					// notify the slotManager and trigger next time heartbeat
    +					resourceManager.handleSlotReportFromTaskManager(result);
    +					sendHeartbeatToTaskManagerLater(1, heartbeatTimeout, heartbeatInterval);
    +				}
    +			}
    +		}, resourceManager.getRpcService().getExecutionContext());
    +
    +		// upon failure, retry
    +		heartbeatResponse.onFailure(new OnFailure() {
    +			@Override
    +			public void onFailure(Throwable failure) {
    +				if (!isClosed()) {
    +					if (attempt == maxAttempt) {
    +						log.error("ResourceManager {} fail to keep heartbeat with taskManager {} after {} attempts",
    +							resourceManager.getAddress(), taskExecutorAddress, attempt);
    +						closed = true;
    +						// mark TaskManager as failed after heartbeat interaction attempts failed for many times
    +						resourceManager.getSelf().notifyResourceFailure(resourceID);
    +					} else {
    +						// upon timeout exception, exponential increase timeout
    +						if (failure instanceof TimeoutException) {
    +							if (log.isDebugEnabled()) {
    +								log.debug("ResourceManager {} lost heartbeat in {} ms with taskManager {} at {} attempts ",
    +									resourceManager.getAddress(), timeoutMillis, attempt);
    +							}
    +
    +							long retryTimeoutMillis = Math.min(2 * timeoutMillis, maxHeartbeatTimeout);
    +							sendHeartbeatToTaskManager(attempt + 1, retryTimeoutMillis);
    +						} else {
    +							log.error(
    +								"ResourceManager {} fail to keep heartbeat with taskManager due to an error",
    +								failure);
    +						}
    +						sendHeartbeatToTaskManagerLater(attempt + 1, timeoutMillis, delayOnError);
    --- End diff --
    
    modified


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75670550
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatScheduler.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * heartbeat between ResourceManager and TaskManager, it is responsible for schedule heartbeat and handle
    + * heartbeat lost cases
    + */
    +public class ResourceManagerToTaskExecutorHeartbeatScheduler {
    +
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** max heartbeat interval time in millisecond (which is used in retry heartbeat case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** if a failure except for timeout exception happened when trigger heartbeat from resourceManager to taskManager , next attemp will start after  ERROR_HEARTBEAT_DELAY_MILLIS millisecond */
    +	private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
    +
    +	/** max heartbeat retry times when lost heartbeat */
    +	private static final int MAX_ATTEMPT_TIMES = 8;
    +
    +	private final long heartbeatInterval;
    +
    +	private final long heartbeatTimeout;
    +
    +	private final long maxHeartbeatTimeout;
    +
    +	private final long delayOnError;
    +
    +	private final int maxAttempt;
    +
    +
    +	/** taskManagerGateway to receive the heartbeat and report slot allocation */
    +	private final TaskExecutorGateway taskExecutorGateway;
    +
    +	/** the taskManager address */
    +	private final String taskExecutorAddress;
    +
    +	/** identify the taskManager resourceID */
    +	private final ResourceID resourceID;
    +
    +	/** identify the resourceManager rpc endpoint */
    +	private final ResourceManager resourceManager;
    +
    +	private final UUID resourceManagerLeaderSessionID;
    +
    +	private final Logger log;
    +
    +	private volatile boolean closed;
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    --- End diff --
    
    Java docs usually state what the method/constructor does and not what it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75868574
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    --- End diff --
    
    Hmm you're right that this can currently happen. We should fix that in the underlying RPC service implementation. In case that the actor ref cannot be resolve an exception should be thrown. I'll open an PR for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75672712
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java ---
    @@ -75,6 +80,7 @@ public void start() {
     		// start by connecting to the ResourceManager
     		try {
     			haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener());
    +			super.start();
    --- End diff --
    
    Let's first call the super method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75677577
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatSchedulerTest.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.actor.ActorSystem;
    +import akka.dispatch.OnSuccess;
    +import akka.testkit.JavaTestKit;
    +import akka.util.Timeout;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.test.TestingServer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
    +import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
    +import org.apache.flink.runtime.util.ZooKeeperUtils;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.IOException;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ForkJoinPool;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +public class ResourceManagerToTaskExecutorHeartbeatSchedulerTest extends TestLogger {
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    --- End diff --
    
    Should not be needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75665272
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
     
    -	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
    +	public ResourceManager(RpcService rpcService, ExecutorService executorService, LeaderElectionService leaderElectionService) {
     		super(rpcService);
     		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
     			Preconditions.checkNotNull(executorService));
     		this.jobMasterGateways = new HashMap<>();
    +		this.taskExecutorGateways = new HashMap<>();
    +		this.heartbeatSchedulers = new HashMap<>();
    +		this.leaderElectionService = leaderElectionService;
    +		leaderSessionID = null;
    +		// TODO this.slotManager = null;
    +	}
    +
    +	@Override
    +	public void start() {
    +		// start a leader
    +		try {
    +			leaderElectionService.start(new ResourceManagerLeaderContender());
    +			super.start();
    --- End diff --
    
    `super.start` should be called before the leader election service is started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75851298
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
    --- End diff --
    
    Alright, that works :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75790203
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatScheduler.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * heartbeat between ResourceManager and TaskManager, it is responsible for schedule heartbeat and handle
    + * heartbeat lost cases
    + */
    +public class ResourceManagerToTaskExecutorHeartbeatScheduler {
    +
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** max heartbeat interval time in millisecond (which is used in retry heartbeat case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** if a failure except for timeout exception happened when trigger heartbeat from resourceManager to taskManager , next attemp will start after  ERROR_HEARTBEAT_DELAY_MILLIS millisecond */
    --- End diff --
    
    good idea


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75683203
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    +		ResourceID resourceID = slotID.getResourceID();
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if (taskManager == null) {
    +			// the given slot is on an unregister taskManager
    +			log.warn("ignore slot {} because it is on an unregister taskManager", slotID);
    +			// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +		} else {
    +			Future<SlotAllocationResponse> response = taskManager.requestSlotForJob(
    +				slotRequest.getAllocationID(), slotRequest.getJobID(), slotID, leaderSessionID);
    +			response.onSuccess(new OnSuccess<SlotAllocationResponse>() {
    +				@Override
    +				public void onSuccess(SlotAllocationResponse result) throws Throwable {
    +					if (result instanceof SlotAllocationResponse.Decline) {
    +						// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +					}
    +				}
    +			}, getMainThreadExecutionContext());
    --- End diff --
    
    Line break missing after this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75683936
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    --- End diff --
    
    I'm not sure whether other components should be allowed to terminate other components. We could notify the component that it has been marked failed and then let it decide what to do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75671552
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java ---
    @@ -18,8 +18,78 @@
     
     package org.apache.flink.runtime.rpc.resourcemanager;
     
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +
     import java.io.Serializable;
     
    -public class SlotRequest implements Serializable{
    +/**
    + * Slot allocation request from jobManager to resourceManager
    + */
    +public class SlotRequest implements Serializable {
     	private static final long serialVersionUID = -6586877187990445986L;
    +
    +	/** jobId to identify which job send the request */
    +	private final JobID jobID;
    +
    +	/** allocationId to identify slot allocation, created by JobManager when requesting a sot */
    +	private final AllocationID allocationID;
    +
    +	/** the resource profile of the desired slot */
    +	private final ResourceProfile profile;
    +
    +	public SlotRequest(JobID jobID, AllocationID allocationID) {
    +		this(jobID, allocationID, null);
    +	}
    +
    +	public SlotRequest(JobID jobID, AllocationID allocationID, ResourceProfile profile) {
    +		this.jobID = checkNotNull(jobID, "jobID cannot be null");
    +		this.allocationID = checkNotNull(allocationID, "allocationID cannot be null");
    +		this.profile = checkNotNull(profile, "profile cannot be null");
    +	}
    +
    +	public ResourceProfile getProfile() {
    +		return profile;
    +	}
    +
    +	public AllocationID getAllocationID() {
    +		return allocationID;
    +	}
    +
    +	public JobID getJobID() {
    +		return jobID;
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +
    +		SlotRequest that = (SlotRequest) o;
    +
    +		if (!jobID.equals(that.jobID)) {
    +			return false;
    +		}
    +		if (!allocationID.equals(that.allocationID)) {
    +			return false;
    +		}
    +		return profile.equals(that.profile);
    +
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		int result = jobID.hashCode();
    +		result = 31 * result + allocationID.hashCode();
    +		result = 31 * result + profile.hashCode();
    +		return result;
    --- End diff --
    
    Can be simplified by `return Objects.hash(jobID, allocationID, profile);`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75671747
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotAllocationResponse.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    --- End diff --
    
    Two leading spaces instead of one


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75675631
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatSchedulerTest.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.actor.ActorSystem;
    +import akka.dispatch.OnSuccess;
    +import akka.testkit.JavaTestKit;
    +import akka.util.Timeout;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.test.TestingServer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
    +import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
    +import org.apache.flink.runtime.util.ZooKeeperUtils;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.IOException;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ForkJoinPool;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +public class ResourceManagerToTaskExecutorHeartbeatSchedulerTest extends TestLogger {
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	private static TestingServer testingServer;
    +
    +	private static ActorSystem actorSystem;
    +	private static ActorSystem actorSystem2;
    +	private static AkkaRpcService akkaRpcService;
    +	private static AkkaRpcService akkaRpcService2;
    +
    +	private static final Timeout timeout = new Timeout(10000, TimeUnit.MILLISECONDS);
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		testingServer = new TestingServer();
    +
    +		actorSystem = AkkaUtils.createDefaultActorSystem();
    +		actorSystem2 = AkkaUtils.createDefaultActorSystem();
    +
    +		akkaRpcService = new AkkaRpcService(actorSystem, timeout);
    +		akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
    +	}
    +
    +	@AfterClass
    +	public static void teardown() throws Exception {
    +		akkaRpcService.stopService();
    +		akkaRpcService2.stopService();
    +
    +		actorSystem.shutdown();
    +		actorSystem2.shutdown();
    +
    +		actorSystem.awaitTermination();
    +		actorSystem2.awaitTermination();
    +
    +		testingServer.stop();
    +	}
    +
    +	@Test
    +	public void testStart() throws Exception {
    +		ExecutorService executorService = new ForkJoinPool();
    +
    +		Configuration configuration = ZooKeeperTestUtils
    +			.createZooKeeperRecoveryModeConfig(
    +				testingServer.getConnectString(),
    +				tempFolder.getRoot().getPath());
    +
    +		CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
    +		LeaderElectionService leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client,
    +			configuration);
    +		ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService, leaderElectionService);
    +
    +		HighAvailabilityServices highAvailabilityServices = new NonHaServices(resourceManager.getAddress());
    +		ResourceID resourceID = ResourceID.generate();
    +		TaskExecutor taskExecutor = new TaskExecutor(akkaRpcService2, highAvailabilityServices, resourceID);
    +
    +		resourceManager.start();
    +		taskExecutor.start();
    --- End diff --
    
    I think that we don't have to start a full-blown cluster with ZooKeeper to test the heartbeat system. It should be enough to mock the receiving ends.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75677462
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatSchedulerTest.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.actor.ActorSystem;
    +import akka.dispatch.OnSuccess;
    +import akka.testkit.JavaTestKit;
    +import akka.util.Timeout;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.test.TestingServer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
    +import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
    +import org.apache.flink.runtime.util.ZooKeeperUtils;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.IOException;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ForkJoinPool;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +public class ResourceManagerToTaskExecutorHeartbeatSchedulerTest extends TestLogger {
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	private static TestingServer testingServer;
    +
    +	private static ActorSystem actorSystem;
    +	private static ActorSystem actorSystem2;
    +	private static AkkaRpcService akkaRpcService;
    +	private static AkkaRpcService akkaRpcService2;
    +
    +	private static final Timeout timeout = new Timeout(10000, TimeUnit.MILLISECONDS);
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		testingServer = new TestingServer();
    +
    +		actorSystem = AkkaUtils.createDefaultActorSystem();
    +		actorSystem2 = AkkaUtils.createDefaultActorSystem();
    +
    +		akkaRpcService = new AkkaRpcService(actorSystem, timeout);
    +		akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
    +	}
    +
    +	@AfterClass
    +	public static void teardown() throws Exception {
    +		akkaRpcService.stopService();
    +		akkaRpcService2.stopService();
    +
    +		actorSystem.shutdown();
    +		actorSystem2.shutdown();
    +
    +		actorSystem.awaitTermination();
    +		actorSystem2.awaitTermination();
    +
    +		testingServer.stop();
    +	}
    +
    +	@Test
    +	public void testStart() throws Exception {
    +		ExecutorService executorService = new ForkJoinPool();
    +
    +		Configuration configuration = ZooKeeperTestUtils
    +			.createZooKeeperRecoveryModeConfig(
    +				testingServer.getConnectString(),
    +				tempFolder.getRoot().getPath());
    +
    +		CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
    +		LeaderElectionService leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client,
    +			configuration);
    +		ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService, leaderElectionService);
    +
    +		HighAvailabilityServices highAvailabilityServices = new NonHaServices(resourceManager.getAddress());
    +		ResourceID resourceID = ResourceID.generate();
    +		TaskExecutor taskExecutor = new TaskExecutor(akkaRpcService2, highAvailabilityServices, resourceID);
    +
    +		resourceManager.start();
    +		taskExecutor.start();
    +
    +		ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(resourceManager, resourceManager.getLeaderSessionID(), taskExecutor.getSelf(), taskExecutor.getAddress(), resourceID, log);
    +		heartbeatScheduler.start();
    +		Assert.assertFalse(heartbeatScheduler.isClosed());
    +		// if heartbeat scheduler cannot receive heartbeat response from taskExecutor for maxAttempt times, it will close itself.
    +		// else it will continue to trigger next heartbeat in the given interval milliseconds
    +		FiniteDuration timeout = new FiniteDuration(40, TimeUnit.SECONDS);
    +		Deadline deadline = timeout.fromNow();
    +
    +		while (deadline.hasTimeLeft() && !heartbeatScheduler.isClosed()) {
    +			Thread.sleep(100);
    +		}
    +
    +		Assert.assertFalse(heartbeatScheduler.isClosed());
    +
    +		heartbeatScheduler.close();
    +		Assert.assertTrue(heartbeatScheduler.isClosed());
    +	}
    --- End diff --
    
    I think we should have more tests, testing the different heartbeat cases. 1. receiving a regular heartbeat in time  and checking that the slot report is properly delivered. 2. Irregular heartbeat 3. Loss of heartbeat and check that the resource is properly failed. Etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75870924
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    --- End diff --
    
    True that could be the case but I thought that the RM will directly be notified by YARN or Mesos. Thus, it will not need a rpc. Maybe we can remove it for now and if we see that other components have to notify the RM about failed resources, then we can introduce it. But for now I'd like to keep the interface as lean as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75793072
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    +		ResourceID resourceID = slotID.getResourceID();
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if (taskManager == null) {
    +			// the given slot is on an unregister taskManager
    +			log.warn("ignore slot {} because it is on an unregister taskManager", slotID);
    +			// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +		} else {
    +			Future<SlotAllocationResponse> response = taskManager.requestSlotForJob(
    +				slotRequest.getAllocationID(), slotRequest.getJobID(), slotID, leaderSessionID);
    +			response.onSuccess(new OnSuccess<SlotAllocationResponse>() {
    +				@Override
    +				public void onSuccess(SlotAllocationResponse result) throws Throwable {
    +					if (result instanceof SlotAllocationResponse.Decline) {
    +						// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +					}
    +				}
    +			}, getMainThreadExecutionContext());
    --- End diff --
    
    because slotManager is designed as not thread safe


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75672201
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java ---
    @@ -18,21 +18,38 @@
     
     package org.apache.flink.runtime.rpc.taskexecutor;
     
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +
     import java.io.Serializable;
    +import java.util.List;
     
     /**
      * A report about the current status of all slots of the TaskExecutor, describing
      * which slots are available and allocated, and what jobs (JobManagers) the allocated slots
      * have been allocated to.
      */
    -public class SlotReport implements Serializable{
    +public class SlotReport implements Serializable {
    +
    +	private static final long serialVersionUID = -3150175198722481689L;
    +
    +	/** the status of all slots of the TaskManager */
    +	private final List<SlotStatus> slotsStatus;
     
    -	private static final long serialVersionUID = 1L;
    +	// resourceID identify the taskExecutor
    --- End diff --
    
    I know we're not overly consistent with that but field descriptions use normally the multi-line comment style `/** .... */`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75664899
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
     
    -	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
    +	public ResourceManager(RpcService rpcService, ExecutorService executorService, LeaderElectionService leaderElectionService) {
     		super(rpcService);
     		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
     			Preconditions.checkNotNull(executorService));
     		this.jobMasterGateways = new HashMap<>();
    +		this.taskExecutorGateways = new HashMap<>();
    +		this.heartbeatSchedulers = new HashMap<>();
    +		this.leaderElectionService = leaderElectionService;
    +		leaderSessionID = null;
    +		// TODO this.slotManager = null;
    --- End diff --
    
    Same here with the TODO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75672245
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java ---
    @@ -18,21 +18,38 @@
     
     package org.apache.flink.runtime.rpc.taskexecutor;
     
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +
     import java.io.Serializable;
    +import java.util.List;
     
     /**
      * A report about the current status of all slots of the TaskExecutor, describing
      * which slots are available and allocated, and what jobs (JobManagers) the allocated slots
      * have been allocated to.
      */
    -public class SlotReport implements Serializable{
    +public class SlotReport implements Serializable {
    +
    +	private static final long serialVersionUID = -3150175198722481689L;
    +
    +	/** the status of all slots of the TaskManager */
    +	private final List<SlotStatus> slotsStatus;
     
    -	private static final long serialVersionUID = 1L;
    +	// resourceID identify the taskExecutor
    +	private final ResourceID resourceID;
     
    -	// ------------------------------------------------------------------------
    -	
    -	@Override
    -	public String toString() {
    -		return "SlotReport";
    +	public SlotReport(List<SlotStatus> slotsStatus, ResourceID resourceID) {
    +		this.slotsStatus = slotsStatus;
    +		this.resourceID = resourceID;
     	}
    +
    +
    --- End diff --
    
    two line breaks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2389: [FLINK-4348] [cluster management] Implement communication...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2389
  
    Thanks for your contribution @beyond1920 :-) I've reviewed the PR and I think it would be good if we split it up into several parts. The first part could be the heartbeat logic a.k.a. `HeartbeatManager`.   Here we could try to implement a generic sending and receiving end. I think the implementation can almost be independent of the JM, RM and TE implementation (similar to `RetryingRegistration`). This will allow us to easily test this component.
    
    The next step would be the integration of this component into the RM, JM and TE.
    
    Concerning the slot request logic I think we should wait a little bit for the `SlotManager` implementation. It could be the case that the `SlotManager` will make the rpcs to the `TaskExecutor` and not the RM. But for the moment the interface is, afaik, not well enough specified to program against it.
    
    The failure notification should also be treated in a separate PR imo. The notification can have multiple origins (e.g. `HeartbeatManager` or the resource management framework) and should be designed in such a way.
    
    In general, I think the components should be more thoroughly tested with more fine-grained unit tests. Furthermore, I think it would be good if we could revise the code documentation a little bit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75669540
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    +		ResourceID resourceID = slotID.getResourceID();
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if (taskManager == null) {
    +			// the given slot is on an unregister taskManager
    +			log.warn("ignore slot {} because it is on an unregister taskManager", slotID);
    +			// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +		} else {
    +			Future<SlotAllocationResponse> response = taskManager.requestSlotForJob(
    +				slotRequest.getAllocationID(), slotRequest.getJobID(), slotID, leaderSessionID);
    +			response.onSuccess(new OnSuccess<SlotAllocationResponse>() {
    +				@Override
    +				public void onSuccess(SlotAllocationResponse result) throws Throwable {
    +					if (result instanceof SlotAllocationResponse.Decline) {
    +						// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +					}
    +				}
    +			}, getMainThreadExecutionContext());
    +			response.onFailure(new OnFailure() {
    +				@Override
    +				public void onFailure(Throwable failure) {
    +					log.error("fail to request slot on taskManager because of error", failure);
    +					// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +				}
    +			}, getMainThreadExecutionContext());
    +		}
    +	}
    +
    +	/**
    +	 * notify slotReport which is sent by taskManager to resourceManager
    +	 *
    +	 * @param slotReport the slot allocation report from taskManager
    +	 */
    +	void handleSlotReportFromTaskManager(final SlotReport slotReport) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				// TODO slotManager.updateSlot(slotReport);
    +			}
    +		});
    +
    +	}
    +
    +
    +	/**
    +	 * callback method when current resourceManager is granted leadership
    +	 * @param newLeaderSessionID unique leadershipID
    +	 */
    +	void handleGrantLeadership(final UUID newLeaderSessionID) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
    +				leaderSessionID = newLeaderSessionID;
    +				// confirming the leader session ID might be blocking, thus do it concurrently
    +				getRpcService().scheduleRunnable(
    +					new Runnable() {
    +						@Override
    +						public void run() {
    +							leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
    +						}
    +					}, 0, TimeUnit.MILLISECONDS
    +				);
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Callback method when current resourceManager lose leadership.
    --- End diff --
    
    typo: lost


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75676516
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatSchedulerTest.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.actor.ActorSystem;
    +import akka.dispatch.OnSuccess;
    +import akka.testkit.JavaTestKit;
    +import akka.util.Timeout;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.test.TestingServer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
    +import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
    +import org.apache.flink.runtime.util.ZooKeeperUtils;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.IOException;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ForkJoinPool;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +public class ResourceManagerToTaskExecutorHeartbeatSchedulerTest extends TestLogger {
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	private static TestingServer testingServer;
    +
    +	private static ActorSystem actorSystem;
    +	private static ActorSystem actorSystem2;
    +	private static AkkaRpcService akkaRpcService;
    +	private static AkkaRpcService akkaRpcService2;
    +
    +	private static final Timeout timeout = new Timeout(10000, TimeUnit.MILLISECONDS);
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		testingServer = new TestingServer();
    +
    +		actorSystem = AkkaUtils.createDefaultActorSystem();
    +		actorSystem2 = AkkaUtils.createDefaultActorSystem();
    +
    +		akkaRpcService = new AkkaRpcService(actorSystem, timeout);
    +		akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
    +	}
    +
    +	@AfterClass
    +	public static void teardown() throws Exception {
    +		akkaRpcService.stopService();
    +		akkaRpcService2.stopService();
    +
    +		actorSystem.shutdown();
    +		actorSystem2.shutdown();
    +
    +		actorSystem.awaitTermination();
    +		actorSystem2.awaitTermination();
    +
    +		testingServer.stop();
    +	}
    +
    +	@Test
    +	public void testStart() throws Exception {
    +		ExecutorService executorService = new ForkJoinPool();
    +
    +		Configuration configuration = ZooKeeperTestUtils
    +			.createZooKeeperRecoveryModeConfig(
    +				testingServer.getConnectString(),
    +				tempFolder.getRoot().getPath());
    +
    +		CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
    +		LeaderElectionService leaderElectionService = ZooKeeperUtils.createLeaderElectionService(client,
    +			configuration);
    +		ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService, leaderElectionService);
    +
    +		HighAvailabilityServices highAvailabilityServices = new NonHaServices(resourceManager.getAddress());
    +		ResourceID resourceID = ResourceID.generate();
    +		TaskExecutor taskExecutor = new TaskExecutor(akkaRpcService2, highAvailabilityServices, resourceID);
    +
    +		resourceManager.start();
    +		taskExecutor.start();
    --- End diff --
    
    You can take a look at https://github.com/apache/flink/pull/2395/commits/c72b2cb34521e35a4ad949c4384eb94afd0ff99c which gives a really nice example for testing an individual component.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75789477
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
    --- End diff --
    
    SlotManager is doing by kurt. So I marked some TODO here waiting for his PR. It would be removed after SlotManager code committed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75664880
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
    --- End diff --
    
    Let's remove the TODO here. This will be added with the `SlotManager` PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2389: [FLINK-4348] [cluster management] Implement communication...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/2389
  
    Hi, till. I submit a new PR #2410  which is focus on heartbeat. Please review that PR, thanks very much. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75671927
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotAllocationResponse.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.taskexecutor;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +
    +import java.io.Serializable;
    +import java.util.UUID;
    +
    +/**
    + * base class  for response from TaskManager to a requestSlot from resourceManager
    --- End diff --
    
    Usually, Java docs start with a capital letter.
    
    Two spaces between "class" and "for"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75682769
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    --- End diff --
    
    maybe the name "sendSlotRequestToTaskManager` is better


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75669856
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java ---
    @@ -53,25 +53,34 @@
     	Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
     
     	/**
    -	 * Requests a slot from the resource manager.
    +	 * JobMaster Requests a slot from the resource manager.
     	 *
     	 * @param slotRequest Slot request
     	 * @return Future slot assignment
     	 */
    -	Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
    +	Future<AcknowledgeSlotRequest> requestSlot(SlotRequest slotRequest);
     
     	/**
    -	 * 
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    -	 * @param timeout                  The timeout for the response.
    -	 * 
    +	 *
    --- End diff --
    
    Java docs missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75665082
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
     
    -	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
    +	public ResourceManager(RpcService rpcService, ExecutorService executorService, LeaderElectionService leaderElectionService) {
    --- End diff --
    
    Stephan recently introduced the `HighAvailabilityServices` which is basically a factory for HA related functionality. We should pass this argument to the constructor and generate the leader election service from there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75681620
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatScheduler.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * heartbeat between ResourceManager and TaskManager, it is responsible for schedule heartbeat and handle
    + * heartbeat lost cases
    + */
    +public class ResourceManagerToTaskExecutorHeartbeatScheduler {
    +
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** max heartbeat interval time in millisecond (which is used in retry heartbeat case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** if a failure except for timeout exception happened when trigger heartbeat from resourceManager to taskManager , next attemp will start after  ERROR_HEARTBEAT_DELAY_MILLIS millisecond */
    +	private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
    +
    +	/** max heartbeat retry times when lost heartbeat */
    +	private static final int MAX_ATTEMPT_TIMES = 8;
    +
    +	private final long heartbeatInterval;
    +
    +	private final long heartbeatTimeout;
    +
    +	private final long maxHeartbeatTimeout;
    +
    +	private final long delayOnError;
    +
    +	private final int maxAttempt;
    +
    +
    +	/** taskManagerGateway to receive the heartbeat and report slot allocation */
    +	private final TaskExecutorGateway taskExecutorGateway;
    +
    +	/** the taskManager address */
    +	private final String taskExecutorAddress;
    +
    +	/** identify the taskManager resourceID */
    +	private final ResourceID resourceID;
    +
    +	/** identify the resourceManager rpc endpoint */
    +	private final ResourceManager resourceManager;
    +
    +	private final UUID resourceManagerLeaderSessionID;
    +
    +	private final Logger log;
    +
    +	private volatile boolean closed;
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager         resourceManager which handles heartbeat communication with taskManager
    +	 * @param taskExecutorGateway     taskManager which receives heartbeat from resourceManager and report its slot
    +	 *                                allocation to resourceManager
    +	 * @param taskExecutorAddress     taskManager's address
    +	 * @param taskExecutorResourceID  taskManager's resourceID
    +	 * @param log
    +	 */
    +	public ResourceManagerToTaskExecutorHeartbeatScheduler(
    --- End diff --
    
    I think we also need a receiving end of the heartbeat because the `TaskExecutor` wants to monitor the `ResourceManager` as well. If the heartbeat request is not delivered, then the RM should be marked as dead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75789222
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    --- End diff --
    
    getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); now if the address is invalid, then taskExecutorGateway is null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 closed the pull request at:

    https://github.com/apache/flink/pull/2389


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75869419
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    +		ResourceID resourceID = slotID.getResourceID();
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if (taskManager == null) {
    +			// the given slot is on an unregister taskManager
    +			log.warn("ignore slot {} because it is on an unregister taskManager", slotID);
    +			// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +		} else {
    +			Future<SlotAllocationResponse> response = taskManager.requestSlotForJob(
    +				slotRequest.getAllocationID(), slotRequest.getJobID(), slotID, leaderSessionID);
    +			response.onSuccess(new OnSuccess<SlotAllocationResponse>() {
    +				@Override
    +				public void onSuccess(SlotAllocationResponse result) throws Throwable {
    +					if (result instanceof SlotAllocationResponse.Decline) {
    +						// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +					}
    +				}
    +			}, getMainThreadExecutionContext());
    --- End diff --
    
    Ah ok, you've implemented it with the slot manager in mind.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75677643
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatSchedulerTest.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.actor.ActorSystem;
    +import akka.dispatch.OnSuccess;
    +import akka.testkit.JavaTestKit;
    +import akka.util.Timeout;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.test.TestingServer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
    +import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
    +import org.apache.flink.runtime.util.ZooKeeperUtils;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.IOException;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ForkJoinPool;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +public class ResourceManagerToTaskExecutorHeartbeatSchedulerTest extends TestLogger {
    --- End diff --
    
    The heartbeat scheduler test case should be really light-weight. No Zk and fully-blown setup needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75681271
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatScheduler.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * heartbeat between ResourceManager and TaskManager, it is responsible for schedule heartbeat and handle
    + * heartbeat lost cases
    + */
    +public class ResourceManagerToTaskExecutorHeartbeatScheduler {
    +
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** max heartbeat interval time in millisecond (which is used in retry heartbeat case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** if a failure except for timeout exception happened when trigger heartbeat from resourceManager to taskManager , next attemp will start after  ERROR_HEARTBEAT_DELAY_MILLIS millisecond */
    +	private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
    +
    +	/** max heartbeat retry times when lost heartbeat */
    +	private static final int MAX_ATTEMPT_TIMES = 8;
    +
    +	private final long heartbeatInterval;
    +
    +	private final long heartbeatTimeout;
    +
    +	private final long maxHeartbeatTimeout;
    +
    +	private final long delayOnError;
    +
    +	private final int maxAttempt;
    +
    +
    +	/** taskManagerGateway to receive the heartbeat and report slot allocation */
    +	private final TaskExecutorGateway taskExecutorGateway;
    +
    +	/** the taskManager address */
    +	private final String taskExecutorAddress;
    +
    +	/** identify the taskManager resourceID */
    +	private final ResourceID resourceID;
    +
    +	/** identify the resourceManager rpc endpoint */
    +	private final ResourceManager resourceManager;
    +
    +	private final UUID resourceManagerLeaderSessionID;
    +
    +	private final Logger log;
    +
    +	private volatile boolean closed;
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager         resourceManager which handles heartbeat communication with taskManager
    +	 * @param taskExecutorGateway     taskManager which receives heartbeat from resourceManager and report its slot
    +	 *                                allocation to resourceManager
    +	 * @param taskExecutorAddress     taskManager's address
    +	 * @param taskExecutorResourceID  taskManager's resourceID
    +	 * @param log
    +	 */
    +	public ResourceManagerToTaskExecutorHeartbeatScheduler(
    +		ResourceManager resourceManager, UUID resourceManagerLeaderSessionID, TaskExecutorGateway taskExecutorGateway,
    +		String taskExecutorAddress, ResourceID taskExecutorResourceID, Logger log) {
    +		this(resourceManager, resourceManagerLeaderSessionID, taskExecutorGateway, taskExecutorAddress, taskExecutorResourceID,
    +			log, INITIAL_HEARTBEAT_INTERVAL_MILLIS, INITIAL_HEARTBEAT_TIMEOUT_MILLIS, MAX_HEARTBEAT_TIMEOUT_MILLIS,
    +			ERROR_HEARTBEAT_DELAY_MILLIS, MAX_ATTEMPT_TIMES);
    +	}
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager
    +	 * @param taskExecutorGateway
    +	 * @param taskExecutorAddress
    +	 * @param taskExecutorResourceID
    +	 * @param log
    +	 * @param heartbeatInterval
    +	 * @param heartbeatTimeout
    +	 * @param maxHeartbeatTimeout
    +	 * @param delayOnError
    +	 * @param maxAttempt
    +	 */
    +	public ResourceManagerToTaskExecutorHeartbeatScheduler(
    +		ResourceManager resourceManager, UUID resourceManagerLeaderSessionID, TaskExecutorGateway taskExecutorGateway,
    +		String taskExecutorAddress, ResourceID taskExecutorResourceID, Logger log, long heartbeatInterval,
    +		long heartbeatTimeout, long maxHeartbeatTimeout, long delayOnError, int maxAttempt) {
    +		this.resourceManager = resourceManager;
    +		this.resourceManagerLeaderSessionID = resourceManagerLeaderSessionID;
    +		this.taskExecutorGateway = taskExecutorGateway;
    +		this.taskExecutorAddress = taskExecutorAddress;
    +		this.resourceID = taskExecutorResourceID;
    +		this.log = log;
    +		this.heartbeatInterval = heartbeatInterval;
    +		this.heartbeatTimeout = heartbeatTimeout;
    +		this.maxHeartbeatTimeout = maxHeartbeatTimeout;
    +		this.delayOnError = delayOnError;
    +		this.maxAttempt = maxAttempt;
    +	}
    +
    +	/**
    +	 * start to schedule next heartbeat
    +	 */
    +	public void start() {
    +		checkState(!closed, "The heartbeat connection is already closed");
    +		sendHeartbeatToTaskManagerLater(1, heartbeatTimeout, heartbeatInterval);
    +	}
    +
    +	/**
    +	 * Checks if the heartbeat schedule was closed.
    +	 *
    +	 * @return True if the heartbeat schedule was closed, false otherwise.
    +	 */
    +	public boolean isClosed() {
    +		return closed;
    +	}
    +
    +	/**
    +	 * stop to schedule heartbeat
    +	 */
    +	public void close() {
    +		closed = true;
    +	}
    +
    +	/**
    +	 * get the heartbeat interval
    +	 *
    +	 * @return heartbeat interval
    +	 */
    +	public long getHeartbeatInterval() {
    +		return heartbeatInterval;
    +	}
    +
    +	/**
    +	 * send a heartbeat attempt to taskManager, receive the slotReport from TaskManager or failed depends on the future result.
    +	 *
    +	 * @param attempt
    +	 * @param timeoutMillis
    +	 */
    +	private void sendHeartbeatToTaskManager(final int attempt, final long timeoutMillis) {
    +		// eager check for closed to avoid some unnecessary work
    +		if (closed) {
    +			return;
    +		}
    +		FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS);
    +		Future<SlotReport> heartbeatResponse = taskExecutorGateway.triggerHeartbeatToResourceManager(resourceManagerLeaderSessionID, timeout);
    +
    +		heartbeatResponse.onSuccess(new OnSuccess<SlotReport>() {
    +
    +			@Override
    +			public void onSuccess(SlotReport result) throws Throwable {
    +				if (!isClosed()) {
    +					// notify the slotManager and trigger next time heartbeat
    +					resourceManager.handleSlotReportFromTaskManager(result);
    +					sendHeartbeatToTaskManagerLater(1, heartbeatTimeout, heartbeatInterval);
    +				}
    +			}
    +		}, resourceManager.getRpcService().getExecutionContext());
    +
    +		// upon failure, retry
    +		heartbeatResponse.onFailure(new OnFailure() {
    +			@Override
    +			public void onFailure(Throwable failure) {
    +				if (!isClosed()) {
    +					if (attempt == maxAttempt) {
    +						log.error("ResourceManager {} fail to keep heartbeat with taskManager {} after {} attempts",
    +							resourceManager.getAddress(), taskExecutorAddress, attempt);
    +						closed = true;
    +						// mark TaskManager as failed after heartbeat interaction attempts failed for many times
    +						resourceManager.getSelf().notifyResourceFailure(resourceID);
    +					} else {
    +						// upon timeout exception, exponential increase timeout
    +						if (failure instanceof TimeoutException) {
    +							if (log.isDebugEnabled()) {
    +								log.debug("ResourceManager {} lost heartbeat in {} ms with taskManager {} at {} attempts ",
    +									resourceManager.getAddress(), timeoutMillis, attempt);
    +							}
    +
    +							long retryTimeoutMillis = Math.min(2 * timeoutMillis, maxHeartbeatTimeout);
    +							sendHeartbeatToTaskManager(attempt + 1, retryTimeoutMillis);
    +						} else {
    +							log.error(
    +								"ResourceManager {} fail to keep heartbeat with taskManager due to an error",
    +								failure);
    +						}
    +						sendHeartbeatToTaskManagerLater(attempt + 1, timeoutMillis, delayOnError);
    --- End diff --
    
    I think the logic here is not correct, since you schedule two heartbeats in case of a `TimeoutException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75683303
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    +		ResourceID resourceID = slotID.getResourceID();
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if (taskManager == null) {
    +			// the given slot is on an unregister taskManager
    +			log.warn("ignore slot {} because it is on an unregister taskManager", slotID);
    +			// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +		} else {
    +			Future<SlotAllocationResponse> response = taskManager.requestSlotForJob(
    +				slotRequest.getAllocationID(), slotRequest.getJobID(), slotID, leaderSessionID);
    +			response.onSuccess(new OnSuccess<SlotAllocationResponse>() {
    +				@Override
    +				public void onSuccess(SlotAllocationResponse result) throws Throwable {
    +					if (result instanceof SlotAllocationResponse.Decline) {
    +						// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +					}
    +				}
    +			}, getMainThreadExecutionContext());
    --- End diff --
    
    Why do we execute this method in the main thread execution context?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75675454
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatSchedulerTest.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.actor.ActorSystem;
    +import akka.dispatch.OnSuccess;
    +import akka.testkit.JavaTestKit;
    +import akka.util.Timeout;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.test.TestingServer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
    +import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
    +import org.apache.flink.runtime.util.ZooKeeperUtils;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.IOException;
    +import java.util.UUID;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.ForkJoinPool;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +public class ResourceManagerToTaskExecutorHeartbeatSchedulerTest extends TestLogger {
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	private static TestingServer testingServer;
    +
    +	private static ActorSystem actorSystem;
    +	private static ActorSystem actorSystem2;
    +	private static AkkaRpcService akkaRpcService;
    +	private static AkkaRpcService akkaRpcService2;
    +
    +	private static final Timeout timeout = new Timeout(10000, TimeUnit.MILLISECONDS);
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		testingServer = new TestingServer();
    +
    +		actorSystem = AkkaUtils.createDefaultActorSystem();
    +		actorSystem2 = AkkaUtils.createDefaultActorSystem();
    +
    +		akkaRpcService = new AkkaRpcService(actorSystem, timeout);
    +		akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
    +	}
    +
    +	@AfterClass
    +	public static void teardown() throws Exception {
    +		akkaRpcService.stopService();
    +		akkaRpcService2.stopService();
    +
    +		actorSystem.shutdown();
    +		actorSystem2.shutdown();
    +
    +		actorSystem.awaitTermination();
    +		actorSystem2.awaitTermination();
    +
    +		testingServer.stop();
    +	}
    +
    +	@Test
    +	public void testStart() throws Exception {
    --- End diff --
    
    Tests should always have a small comment describing what they test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75789279
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
     
    -	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
    +	public ResourceManager(RpcService rpcService, ExecutorService executorService, LeaderElectionService leaderElectionService) {
     		super(rpcService);
     		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
     			Preconditions.checkNotNull(executorService));
     		this.jobMasterGateways = new HashMap<>();
    +		this.taskExecutorGateways = new HashMap<>();
    +		this.heartbeatSchedulers = new HashMap<>();
    +		this.leaderElectionService = leaderElectionService;
    +		leaderSessionID = null;
    +		// TODO this.slotManager = null;
    +	}
    +
    +	@Override
    +	public void start() {
    +		// start a leader
    +		try {
    +			leaderElectionService.start(new ResourceManagerLeaderContender());
    +			super.start();
    +		} catch (Exception e) {
    +			log.error("a fatal error happened when start resourceManager", e);
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75666060
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    --- End diff --
    
    I think it's better to start logging messages with a capital letter. Maybe we could output "Received TaskExecutor registration request with resource id {} from {}." or something like that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75789610
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
     
    -	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
    +	public ResourceManager(RpcService rpcService, ExecutorService executorService, LeaderElectionService leaderElectionService) {
    --- End diff --
    
    Cool. maybe i could add a method like "LeaderElectionService getResourceManagerLeaderElectionService() throws Exception; " in HighAvailabilityServices class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75665400
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
     
    -	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
    +	public ResourceManager(RpcService rpcService, ExecutorService executorService, LeaderElectionService leaderElectionService) {
     		super(rpcService);
     		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
     			Preconditions.checkNotNull(executorService));
     		this.jobMasterGateways = new HashMap<>();
    +		this.taskExecutorGateways = new HashMap<>();
    +		this.heartbeatSchedulers = new HashMap<>();
    +		this.leaderElectionService = leaderElectionService;
    +		leaderSessionID = null;
    +		// TODO this.slotManager = null;
    +	}
    +
    +	@Override
    +	public void start() {
    +		// start a leader
    +		try {
    +			leaderElectionService.start(new ResourceManagerLeaderContender());
    +			super.start();
    +		} catch (Exception e) {
    +			log.error("a fatal error happened when start resourceManager", e);
    --- End diff --
    
    Let's start with a capital letter "A" and end with "when starting the ResourceManager.".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75670298
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatScheduler.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * heartbeat between ResourceManager and TaskManager, it is responsible for schedule heartbeat and handle
    + * heartbeat lost cases
    + */
    +public class ResourceManagerToTaskExecutorHeartbeatScheduler {
    +
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** max heartbeat interval time in millisecond (which is used in retry heartbeat case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** if a failure except for timeout exception happened when trigger heartbeat from resourceManager to taskManager , next attemp will start after  ERROR_HEARTBEAT_DELAY_MILLIS millisecond */
    --- End diff --
    
    Line too long. Maybe we could rephrase it to "Heartbeat attempt delay after an exception has occurred"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75683343
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    +		ResourceID resourceID = slotID.getResourceID();
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if (taskManager == null) {
    +			// the given slot is on an unregister taskManager
    +			log.warn("ignore slot {} because it is on an unregister taskManager", slotID);
    +			// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +		} else {
    +			Future<SlotAllocationResponse> response = taskManager.requestSlotForJob(
    +				slotRequest.getAllocationID(), slotRequest.getJobID(), slotID, leaderSessionID);
    +			response.onSuccess(new OnSuccess<SlotAllocationResponse>() {
    +				@Override
    +				public void onSuccess(SlotAllocationResponse result) throws Throwable {
    +					if (result instanceof SlotAllocationResponse.Decline) {
    +						// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +					}
    +				}
    +			}, getMainThreadExecutionContext());
    +			response.onFailure(new OnFailure() {
    +				@Override
    +				public void onFailure(Throwable failure) {
    +					log.error("fail to request slot on taskManager because of error", failure);
    +					// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +				}
    +			}, getMainThreadExecutionContext());
    --- End diff --
    
    Why main thread execution context?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75682445
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    --- End diff --
    
    Why not calling `taskExecutorGateways.remove(resourceID)` directly here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75683123
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    +		ResourceID resourceID = slotID.getResourceID();
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    --- End diff --
    
    `taskManager`? Should this be renamed to `taskExecutor`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75682145
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    --- End diff --
    
    Maybe it makes sense to create a `HeartbeatManager` which manages the different resources to heartbeat.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75665618
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
     
    -	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
    +	public ResourceManager(RpcService rpcService, ExecutorService executorService, LeaderElectionService leaderElectionService) {
     		super(rpcService);
     		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
     			Preconditions.checkNotNull(executorService));
     		this.jobMasterGateways = new HashMap<>();
    +		this.taskExecutorGateways = new HashMap<>();
    +		this.heartbeatSchedulers = new HashMap<>();
    +		this.leaderElectionService = leaderElectionService;
    +		leaderSessionID = null;
    +		// TODO this.slotManager = null;
    +	}
    +
    +	@Override
    +	public void start() {
    --- End diff --
    
    It might actually makes sense to let the `start` method throw an exception so that a failed start call will result in a thrown exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75671701
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotAllocationResponse.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.taskexecutor;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +
    +import java.io.Serializable;
    +import java.util.UUID;
    +
    +/**
    + * base class  for response from TaskManager to a requestSlot from resourceManager
    + */
    +public abstract class SlotAllocationResponse implements Serializable {
    --- End diff --
    
    No serialVersionUID field defined.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75668138
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    --- End diff --
    
    I think that the `taskExecutorGateway` should never be `null` if I'm not mistaken.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75686098
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    --- End diff --
    
    I'm not sure whether this has to be a `RpcMethod`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75681182
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatScheduler.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * heartbeat between ResourceManager and TaskManager, it is responsible for schedule heartbeat and handle
    + * heartbeat lost cases
    + */
    +public class ResourceManagerToTaskExecutorHeartbeatScheduler {
    +
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** max heartbeat interval time in millisecond (which is used in retry heartbeat case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** if a failure except for timeout exception happened when trigger heartbeat from resourceManager to taskManager , next attemp will start after  ERROR_HEARTBEAT_DELAY_MILLIS millisecond */
    +	private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
    +
    +	/** max heartbeat retry times when lost heartbeat */
    +	private static final int MAX_ATTEMPT_TIMES = 8;
    +
    +	private final long heartbeatInterval;
    +
    +	private final long heartbeatTimeout;
    +
    +	private final long maxHeartbeatTimeout;
    +
    +	private final long delayOnError;
    +
    +	private final int maxAttempt;
    +
    +
    +	/** taskManagerGateway to receive the heartbeat and report slot allocation */
    +	private final TaskExecutorGateway taskExecutorGateway;
    +
    +	/** the taskManager address */
    +	private final String taskExecutorAddress;
    +
    +	/** identify the taskManager resourceID */
    +	private final ResourceID resourceID;
    +
    +	/** identify the resourceManager rpc endpoint */
    +	private final ResourceManager resourceManager;
    +
    +	private final UUID resourceManagerLeaderSessionID;
    +
    +	private final Logger log;
    +
    +	private volatile boolean closed;
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager         resourceManager which handles heartbeat communication with taskManager
    +	 * @param taskExecutorGateway     taskManager which receives heartbeat from resourceManager and report its slot
    +	 *                                allocation to resourceManager
    +	 * @param taskExecutorAddress     taskManager's address
    +	 * @param taskExecutorResourceID  taskManager's resourceID
    +	 * @param log
    +	 */
    +	public ResourceManagerToTaskExecutorHeartbeatScheduler(
    --- End diff --
    
    Can we make the `ResourceManagerToTaskExecutorHeartbeatScheduler` a little bit more general? I think it is not necessary to bake in the dependency on the `ResourceManager` and the `TaskExecutorGateway`. You basically need two abstract methods `reportHeartbeat` and `triggerHeartbeat` which can be overwritten by sub classes. Alternatively you could also specify two interfaces `HeartbeatListener` and `HeartbeatTarget`, for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75681431
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatScheduler.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * heartbeat between ResourceManager and TaskManager, it is responsible for schedule heartbeat and handle
    + * heartbeat lost cases
    + */
    +public class ResourceManagerToTaskExecutorHeartbeatScheduler {
    +
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** max heartbeat interval time in millisecond (which is used in retry heartbeat case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** if a failure except for timeout exception happened when trigger heartbeat from resourceManager to taskManager , next attemp will start after  ERROR_HEARTBEAT_DELAY_MILLIS millisecond */
    +	private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
    +
    +	/** max heartbeat retry times when lost heartbeat */
    +	private static final int MAX_ATTEMPT_TIMES = 8;
    +
    +	private final long heartbeatInterval;
    +
    +	private final long heartbeatTimeout;
    +
    +	private final long maxHeartbeatTimeout;
    +
    +	private final long delayOnError;
    +
    +	private final int maxAttempt;
    +
    +
    +	/** taskManagerGateway to receive the heartbeat and report slot allocation */
    +	private final TaskExecutorGateway taskExecutorGateway;
    +
    +	/** the taskManager address */
    +	private final String taskExecutorAddress;
    +
    +	/** identify the taskManager resourceID */
    +	private final ResourceID resourceID;
    +
    +	/** identify the resourceManager rpc endpoint */
    +	private final ResourceManager resourceManager;
    +
    +	private final UUID resourceManagerLeaderSessionID;
    +
    +	private final Logger log;
    +
    +	private volatile boolean closed;
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager         resourceManager which handles heartbeat communication with taskManager
    +	 * @param taskExecutorGateway     taskManager which receives heartbeat from resourceManager and report its slot
    +	 *                                allocation to resourceManager
    +	 * @param taskExecutorAddress     taskManager's address
    +	 * @param taskExecutorResourceID  taskManager's resourceID
    +	 * @param log
    +	 */
    +	public ResourceManagerToTaskExecutorHeartbeatScheduler(
    --- End diff --
    
    This would also allow to test this component more easily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75869162
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    +		ResourceID resourceID = slotID.getResourceID();
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if (taskManager == null) {
    +			// the given slot is on an unregister taskManager
    +			log.warn("ignore slot {} because it is on an unregister taskManager", slotID);
    +			// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +		} else {
    +			Future<SlotAllocationResponse> response = taskManager.requestSlotForJob(
    +				slotRequest.getAllocationID(), slotRequest.getJobID(), slotID, leaderSessionID);
    +			response.onSuccess(new OnSuccess<SlotAllocationResponse>() {
    +				@Override
    +				public void onSuccess(SlotAllocationResponse result) throws Throwable {
    +					if (result instanceof SlotAllocationResponse.Decline) {
    +						// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +					}
    +				}
    +			}, getMainThreadExecutionContext());
    +			response.onFailure(new OnFailure() {
    +				@Override
    +				public void onFailure(Throwable failure) {
    +					log.error("fail to request slot on taskManager because of error", failure);
    +					// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +				}
    +			}, getMainThreadExecutionContext());
    +		}
    +	}
    +
    +	/**
    +	 * notify slotReport which is sent by taskManager to resourceManager
    +	 *
    +	 * @param slotReport the slot allocation report from taskManager
    +	 */
    +	void handleSlotReportFromTaskManager(final SlotReport slotReport) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				// TODO slotManager.updateSlot(slotReport);
    +			}
    +		});
    +
    +	}
    +
    +
    +	/**
    +	 * callback method when current resourceManager is granted leadership
    +	 * @param newLeaderSessionID unique leadershipID
    +	 */
    +	void handleGrantLeadership(final UUID newLeaderSessionID) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
    +				leaderSessionID = newLeaderSessionID;
    +				// confirming the leader session ID might be blocking, thus do it concurrently
    +				getRpcService().scheduleRunnable(
    --- End diff --
    
    Yes that makes sense to do it asynchronously. Ah I see now. There is no `RPCService.runAsync()` ... Maybe we should add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75669361
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    +		ResourceID resourceID = slotID.getResourceID();
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if (taskManager == null) {
    +			// the given slot is on an unregister taskManager
    +			log.warn("ignore slot {} because it is on an unregister taskManager", slotID);
    +			// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +		} else {
    +			Future<SlotAllocationResponse> response = taskManager.requestSlotForJob(
    +				slotRequest.getAllocationID(), slotRequest.getJobID(), slotID, leaderSessionID);
    +			response.onSuccess(new OnSuccess<SlotAllocationResponse>() {
    +				@Override
    +				public void onSuccess(SlotAllocationResponse result) throws Throwable {
    +					if (result instanceof SlotAllocationResponse.Decline) {
    +						// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +					}
    +				}
    +			}, getMainThreadExecutionContext());
    +			response.onFailure(new OnFailure() {
    +				@Override
    +				public void onFailure(Throwable failure) {
    +					log.error("fail to request slot on taskManager because of error", failure);
    +					// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +				}
    +			}, getMainThreadExecutionContext());
    +		}
    +	}
    +
    +	/**
    +	 * notify slotReport which is sent by taskManager to resourceManager
    +	 *
    +	 * @param slotReport the slot allocation report from taskManager
    +	 */
    +	void handleSlotReportFromTaskManager(final SlotReport slotReport) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				// TODO slotManager.updateSlot(slotReport);
    +			}
    +		});
    +
    +	}
    +
    +
    +	/**
    +	 * callback method when current resourceManager is granted leadership
    +	 * @param newLeaderSessionID unique leadershipID
    +	 */
    +	void handleGrantLeadership(final UUID newLeaderSessionID) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
    +				leaderSessionID = newLeaderSessionID;
    +				// confirming the leader session ID might be blocking, thus do it concurrently
    +				getRpcService().scheduleRunnable(
    --- End diff --
    
    `getRpcService.runAsync(...)` is equivalent to the `scheduleRunnable` call with `0` delay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75669804
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java ---
    @@ -53,25 +53,34 @@
     	Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
     
     	/**
    -	 * Requests a slot from the resource manager.
    +	 * JobMaster Requests a slot from the resource manager.
    --- End diff --
    
    lower case r in requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75670660
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatScheduler.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * heartbeat between ResourceManager and TaskManager, it is responsible for schedule heartbeat and handle
    + * heartbeat lost cases
    + */
    +public class ResourceManagerToTaskExecutorHeartbeatScheduler {
    +
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** max heartbeat interval time in millisecond (which is used in retry heartbeat case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** if a failure except for timeout exception happened when trigger heartbeat from resourceManager to taskManager , next attemp will start after  ERROR_HEARTBEAT_DELAY_MILLIS millisecond */
    +	private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
    +
    +	/** max heartbeat retry times when lost heartbeat */
    +	private static final int MAX_ATTEMPT_TIMES = 8;
    +
    +	private final long heartbeatInterval;
    +
    +	private final long heartbeatTimeout;
    +
    +	private final long maxHeartbeatTimeout;
    +
    +	private final long delayOnError;
    +
    +	private final int maxAttempt;
    +
    +
    +	/** taskManagerGateway to receive the heartbeat and report slot allocation */
    +	private final TaskExecutorGateway taskExecutorGateway;
    +
    +	/** the taskManager address */
    +	private final String taskExecutorAddress;
    +
    +	/** identify the taskManager resourceID */
    +	private final ResourceID resourceID;
    +
    +	/** identify the resourceManager rpc endpoint */
    +	private final ResourceManager resourceManager;
    +
    +	private final UUID resourceManagerLeaderSessionID;
    +
    +	private final Logger log;
    +
    +	private volatile boolean closed;
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager         resourceManager which handles heartbeat communication with taskManager
    +	 * @param taskExecutorGateway     taskManager which receives heartbeat from resourceManager and report its slot
    +	 *                                allocation to resourceManager
    +	 * @param taskExecutorAddress     taskManager's address
    +	 * @param taskExecutorResourceID  taskManager's resourceID
    +	 * @param log
    +	 */
    +	public ResourceManagerToTaskExecutorHeartbeatScheduler(
    +		ResourceManager resourceManager, UUID resourceManagerLeaderSessionID, TaskExecutorGateway taskExecutorGateway,
    +		String taskExecutorAddress, ResourceID taskExecutorResourceID, Logger log) {
    +		this(resourceManager, resourceManagerLeaderSessionID, taskExecutorGateway, taskExecutorAddress, taskExecutorResourceID,
    +			log, INITIAL_HEARTBEAT_INTERVAL_MILLIS, INITIAL_HEARTBEAT_TIMEOUT_MILLIS, MAX_HEARTBEAT_TIMEOUT_MILLIS,
    +			ERROR_HEARTBEAT_DELAY_MILLIS, MAX_ATTEMPT_TIMES);
    +	}
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager
    +	 * @param taskExecutorGateway
    +	 * @param taskExecutorAddress
    +	 * @param taskExecutorResourceID
    +	 * @param log
    +	 * @param heartbeatInterval
    +	 * @param heartbeatTimeout
    +	 * @param maxHeartbeatTimeout
    +	 * @param delayOnError
    +	 * @param maxAttempt
    --- End diff --
    
    Param descriptions are misisng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75665705
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    --- End diff --
    
    Let's remove the TODOs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75672650
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.taskexecutor;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.SlotID;
    +
    +import java.io.Serializable;
    +
    +/**
    + * SlotStatus implementation. It is responsible for describe slot allocation.
    + */
    +public class SlotStatus implements Serializable {
    +
    +	private static final long serialVersionUID = 5099191707339664493L;
    +
    +	/** slotID to identify a slot */
    +	private final SlotID slotID;
    +
    +	/** the resource profile of the slot */
    +	private final ResourceProfile profile;
    +
    +	/** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */
    +	private final AllocationID allocationID;
    +
    +	/** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */
    +	private final JobID jobID;
    +
    +	public SlotStatus(SlotID slotID, ResourceProfile profile) {
    +		this(slotID, profile, null, null);
    +	}
    +
    +	public SlotStatus(SlotID slotID, ResourceProfile profile, AllocationID allocationID, JobID jobID) {
    +		this.slotID = checkNotNull(slotID, "slotID cannot be null");
    +		this.profile = checkNotNull(profile, "profile cannot be null");
    +		this.allocationID = allocationID;
    +		this.jobID = jobID;
    +	}
    +
    +	public SlotID getSlotID() {
    +		return slotID;
    +	}
    +
    +	public ResourceProfile getProfile() {
    +		return profile;
    +	}
    +
    +	public AllocationID getAllocationID() {
    +		return allocationID;
    +	}
    +
    +	public JobID getJobID() {
    +		return jobID;
    +	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +
    +		SlotStatus that = (SlotStatus) o;
    +
    +		if (!slotID.equals(that.slotID)) {
    +			return false;
    +		}
    +		if (!profile.equals(that.profile)) {
    +			return false;
    +		}
    +		if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) {
    +			return false;
    +		}
    +		return jobID != null ? jobID.equals(that.jobID) : that.jobID == null;
    +
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		int result = slotID.hashCode();
    +		result = 31 * result + profile.hashCode();
    +		result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0);
    +		result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
    +		return result;
    --- End diff --
    
    `Objects.hash(...)` does the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75790043
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    +		ResourceID resourceID = slotID.getResourceID();
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if (taskManager == null) {
    +			// the given slot is on an unregister taskManager
    +			log.warn("ignore slot {} because it is on an unregister taskManager", slotID);
    +			// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +		} else {
    +			Future<SlotAllocationResponse> response = taskManager.requestSlotForJob(
    +				slotRequest.getAllocationID(), slotRequest.getJobID(), slotID, leaderSessionID);
    +			response.onSuccess(new OnSuccess<SlotAllocationResponse>() {
    +				@Override
    +				public void onSuccess(SlotAllocationResponse result) throws Throwable {
    +					if (result instanceof SlotAllocationResponse.Decline) {
    +						// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +					}
    +				}
    +			}, getMainThreadExecutionContext());
    +			response.onFailure(new OnFailure() {
    +				@Override
    +				public void onFailure(Throwable failure) {
    +					log.error("fail to request slot on taskManager because of error", failure);
    +					// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +				}
    +			}, getMainThreadExecutionContext());
    +		}
    +	}
    +
    +	/**
    +	 * notify slotReport which is sent by taskManager to resourceManager
    +	 *
    +	 * @param slotReport the slot allocation report from taskManager
    +	 */
    +	void handleSlotReportFromTaskManager(final SlotReport slotReport) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				// TODO slotManager.updateSlot(slotReport);
    +			}
    +		});
    +
    +	}
    +
    +
    +	/**
    +	 * callback method when current resourceManager is granted leadership
    +	 * @param newLeaderSessionID unique leadershipID
    +	 */
    +	void handleGrantLeadership(final UUID newLeaderSessionID) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
    +				leaderSessionID = newLeaderSessionID;
    +				// confirming the leader session ID might be blocking, thus do it concurrently
    +				getRpcService().scheduleRunnable(
    --- End diff --
    
    yes.Because leaderElectionService.confirmLeaderSessionID might be blocking, So i do it in another async call.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75793520
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    --- End diff --
    
    in some corner cases, Cluster manager framework(e.g. yarn master) will notify resourceManager that a TaskExecutor is marked as invalid, so it could be a RpcMethod


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75672457
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.taskexecutor;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.SlotID;
    +
    +import java.io.Serializable;
    +
    +/**
    + * SlotStatus implementation. It is responsible for describe slot allocation.
    --- End diff --
    
    Revise java docs. It is always good to describe what's the purpose of this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75683067
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    +		}
    +	}
    +
    +	/**
    +	 * close heartbeat triggers to resource if exist
    +	 * @param resourceID which resource need to stop keep heartbeat with
    +	 */
    +	private void closeHeartbeatToResourceIfExist(ResourceID resourceID) {
    +		if(heartbeatSchedulers.containsKey(resourceID)) {
    +			ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatManager = heartbeatSchedulers.get(resourceID);
    +			heartbeatManager.close();
    +			heartbeatSchedulers.remove(resourceID);
    +		}
    +	}
    +
    +	/**
    +	 * send slotRequest to the taskManager which the given slot is on
    +	 *
    +	 * @param slotRequest slot request information
    +	 * @param slotID      which slot is choosen
    +	 */
    +	void requestSlotToTaskManager(final SlotRequest slotRequest, final SlotID slotID) {
    +		ResourceID resourceID = slotID.getResourceID();
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if (taskManager == null) {
    +			// the given slot is on an unregister taskManager
    +			log.warn("ignore slot {} because it is on an unregister taskManager", slotID);
    +			// TODO slotManager.handleSlotRequestFailedAtTaskManager(slotRequest, slotID);
    +		} else {
    +			Future<SlotAllocationResponse> response = taskManager.requestSlotForJob(
    +				slotRequest.getAllocationID(), slotRequest.getJobID(), slotID, leaderSessionID);
    --- End diff --
    
    Why do you unwrap the `SlotRequest` here? We could also send the `SlotRequest` to the `TaskExecutor`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75665777
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    --- End diff --
    
    Java doc is missing here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2389: Jira FLINK-4348

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2389
  
    Hi @beyond1920. Thanks for opening the PR. Could you update the PR title to "[FLINK-4348] [cluster management] Implement communication from ResourceManager to TaskManager".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75792341
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerToTaskExecutorHeartbeatScheduler.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * heartbeat between ResourceManager and TaskManager, it is responsible for schedule heartbeat and handle
    + * heartbeat lost cases
    + */
    +public class ResourceManagerToTaskExecutorHeartbeatScheduler {
    +
    +	/** default heartbeat interval time in millisecond */
    +	private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +	/** default heartbeat timeout in millisecond */
    +	private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +	/** max heartbeat interval time in millisecond (which is used in retry heartbeat case) */
    +	private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +	/** if a failure except for timeout exception happened when trigger heartbeat from resourceManager to taskManager , next attemp will start after  ERROR_HEARTBEAT_DELAY_MILLIS millisecond */
    +	private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
    +
    +	/** max heartbeat retry times when lost heartbeat */
    +	private static final int MAX_ATTEMPT_TIMES = 8;
    +
    +	private final long heartbeatInterval;
    +
    +	private final long heartbeatTimeout;
    +
    +	private final long maxHeartbeatTimeout;
    +
    +	private final long delayOnError;
    +
    +	private final int maxAttempt;
    +
    +
    +	/** taskManagerGateway to receive the heartbeat and report slot allocation */
    +	private final TaskExecutorGateway taskExecutorGateway;
    +
    +	/** the taskManager address */
    +	private final String taskExecutorAddress;
    +
    +	/** identify the taskManager resourceID */
    +	private final ResourceID resourceID;
    +
    +	/** identify the resourceManager rpc endpoint */
    +	private final ResourceManager resourceManager;
    +
    +	private final UUID resourceManagerLeaderSessionID;
    +
    +	private final Logger log;
    +
    +	private volatile boolean closed;
    +
    +	/**
    +	 * ResourceManagerToTaskExecutorHeartbeatScheduler constructor
    +	 *
    +	 * @param resourceManager         resourceManager which handles heartbeat communication with taskManager
    +	 * @param taskExecutorGateway     taskManager which receives heartbeat from resourceManager and report its slot
    +	 *                                allocation to resourceManager
    +	 * @param taskExecutorAddress     taskManager's address
    +	 * @param taskExecutorResourceID  taskManager's resourceID
    +	 * @param log
    +	 */
    +	public ResourceManagerToTaskExecutorHeartbeatScheduler(
    --- End diff --
    
    good idea


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75793093
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    +		Future<TaskExecutorGateway> taskExecutorFuture =
    +			getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
    +
    +		return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() {
    +			@Override
    +			public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
    +				// decline registration if resourceManager cannot connect to the taskExecutor using the given address
    +				if(taskExecutorGateway == null) {
    +					log.warn("resourceManager {} decline registration from the taskExecutor {}, cannot connect to it using given address {} ",
    +						getAddress(), resourceID, taskExecutorAddress);
    +					return new org.apache.flink.runtime.rpc.registration.RegistrationResponse.Decline("cannot connect to taskExecutor using given address");
    +				} else {
    +					// save the register taskExecutor gateway
    +					taskExecutorGateways.put(resourceID, taskExecutorGateway);
    +					// schedule the heartbeat with the registered taskExecutor
    +					ResourceManagerToTaskExecutorHeartbeatScheduler heartbeatScheduler = new ResourceManagerToTaskExecutorHeartbeatScheduler(
    +						ResourceManager.this, leaderSessionID, taskExecutorGateway, taskExecutorAddress, resourceID, log);
    +					heartbeatScheduler.start();
    +					heartbeatSchedulers.put(resourceID, heartbeatScheduler);
    +					return new TaskExecutorRegistrationSuccess(new InstanceID(), heartbeatScheduler.getHeartbeatInterval());
    +				}
    +			}
    +		}, getMainThreadExecutionContext());
    +
    +	}
    +
    +	/**
    +	 * notify resource failure to resourceManager, because of two reasons:
    +	 * 1. cannot keep heartbeat with taskManager for several times, mark the resource as failed
    +	 * 2. in some corner cases, TM will be marked as invalid by cluster manager master(e.g. yarn master), but TM itself does not realize.
    +	 *
    +	 * @param resourceID identify the taskManager which to stop
    +	 */
    +	@RpcMethod
    +	public void notifyResourceFailure(ResourceID resourceID) {
    +		log.warn("receive failure notification of resource {}", resourceID);
    +		TaskExecutorGateway taskManager = taskExecutorGateways.get(resourceID);
    +		if(taskManager == null) {
    +			// ignore command to stop an unregister taskManager
    +			log.warn("ignore stop taskManager command because {} is unregistered", resourceID);
    +		} else {
    +			taskExecutorGateways.remove(resourceID);
    +			closeHeartbeatToResourceIfExist(resourceID);
    +			// TODO notify slotManager and notify jobMaster,
    +			// TODO slotManager.notifyTaskManagerFailure(resourceID);
    +			taskManager.shutDown(leaderSessionID);
    --- End diff --
    
    good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75789238
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) {
    +		// TODO slotManager.requestSlot(slotRequest);
    +		return new AcknowledgeSlotRequest(slotRequest.getAllocationID());
     	}
     
     
     	/**
    -	 *
    -	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
    -	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
    -	 * @param resourceID               The resource ID of the TaskExecutor that registers
    +	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    +	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
    +	 * @param resourceID              The resource ID of the TaskExecutor that registers
     	 *
     	 * @return The response by the ResourceManager.
     	 */
     	@RpcMethod
    -	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
    -			UUID resourceManagerLeaderId,
    -			String taskExecutorAddress,
    -			ResourceID resourceID) {
    +	public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
    +		final UUID resourceManagerLeaderId,
    +		final String taskExecutorAddress,
    +		final ResourceID resourceID
    +	) {
    +		log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress);
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2389: [FLINK-4348] [cluster management] Implement commun...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2389#discussion_r75851341
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -52,18 +61,45 @@
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
     	private final ExecutionContext executionContext;
     	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +	private final Map<ResourceID, TaskExecutorGateway> taskExecutorGateways;
    +	private final Map<ResourceID, ResourceManagerToTaskExecutorHeartbeatScheduler> heartbeatSchedulers;
    +	private final LeaderElectionService leaderElectionService;
    +	private UUID leaderSessionID;
    +	// TODO private final SlotManager slotManager;
     
    -	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
    +	public ResourceManager(RpcService rpcService, ExecutorService executorService, LeaderElectionService leaderElectionService) {
    --- End diff --
    
    Yes exactly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---