You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mxm <gi...@git.apache.org> on 2016/09/02 14:30:31 UTC

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

GitHub user mxm opened a pull request:

    https://github.com/apache/flink/pull/2463

    [FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol

    This implements and tests the ResourceManager part of the protocol for slot allocation.
    
    - associates JobMasters with JobID instead of InstanceID
    - adds TaskExecutorGateway to slot and notify from SlotManager
    - adds SlotManager as RM constructor parameter
    - adds LeaderIdRetriever to keep track of the leader id
    
    - tests the interaction JM->RM requestSlot
    - tests the interaction RM->TM requestSlot

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mxm/flink flip-6

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2463.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2463
    
----
commit 213f4ee6a30bd87e9e04c5a4b22022e0636db9e9
Author: Maximilian Michels <mx...@apache.org>
Date:   2016-09-01T14:53:31Z

    [FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol
    
    - associates JobMasters with JobID instead of InstanceID
    - adds TaskExecutorGateway to slot
    - adds SlotManager as RM constructor parameter
    - adds LeaderIdRetriever to keep track of the leader id
    
    - tests the interaction JM->RM requestSlot
    - tests the interaction RM->TM requestSlot

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77768256
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) {
     	 * RPC's main thread to avoid race condition).
     	 *
     	 * @param request The detailed request of the slot
    +	 * @return SlotRequestRegistered The confirmation message to be send to the caller
     	 */
    -	public void requestSlot(final SlotRequest request) {
    +	public SlotRequestRegistered requestSlot(final SlotRequest request) {
    +		final AllocationID allocationId = request.getAllocationId();
     		if (isRequestDuplicated(request)) {
    -			LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId());
    -			return;
    +			LOG.warn("Duplicated slot request, AllocationID:{}", allocationId);
    +			return null;
     		}
     
     		// try to fulfil the request with current free slots
    -		ResourceSlot slot = chooseSlotToUse(request, freeSlots);
    +		final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
     		if (slot != null) {
     			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
    -				request.getAllocationId(), request.getJobId());
    +				allocationId, request.getJobId());
     
     			// record this allocation in bookkeeping
    -			allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId());
    +			allocationMap.addAllocation(slot.getSlotId(), allocationId);
     
     			// remove selected slot from free pool
     			freeSlots.remove(slot.getSlotId());
     
    -			// TODO: send slot request to TaskManager
    +			slot.getTaskExecutorGateway()
    +				.requestSlot(allocationId, leaderIdRegistry.getLeaderID());
    --- End diff --
    
    ResourceManager keeps a relationship between resourceID and TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID using ResourceManager here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allocation ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2463
  
    Thanks for your work @mxm. I've had some comments which you can find inline. I think the implementation of the slot request logic made another step in the right direction with this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77522339
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -52,15 +58,28 @@
      * </ul>
      */
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
    -	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +
    +	private final Logger LOG = LoggerFactory.getLogger(getClass());
    --- End diff --
    
    Why not making it static?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allocation ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2463
  
    I've rebased the pull request and incorporated your suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77787126
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java ---
    @@ -15,11 +15,27 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +package org.apache.flink.runtime.highavailability;
     
    -package org.apache.flink.runtime.resourcemanager;
    +import java.util.UUID;
     
    -import java.io.Serializable;
    +/**
    + * Registry class to keep track of the current leader ID.
    + */
    +public class LeaderIdRegistry {
    --- End diff --
    
    The class has some docs but as you can see given my initial question, it's purpose was not clear to me.
    
    Yes, I actually thought about marking `leaderSessionID` `volatile`. 
    
    Given the interface of this class every component which has a reference to this registry is allowed to change the leader session ID. This can be problematic because components other than the `ResourceManager` should only be allowed to retrieve the leader session ID.
    
    I'm actually wondering whether it is not necessary to notify the components about a new leader session ID. For example, the `SlotManager` should probably free its registered slots when it loses the leadership. Wouldn't these calls be suitable to transmit the current leader session ID? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77630000
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---
    @@ -32,4 +33,11 @@
     	// ------------------------------------------------------------------------
     
     	void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId);
    +
    +	/**
    +	 * Send by the ResourceManager to the TaskExecutor
    +	 * @param allocationID id for the request
    +	 * @param resourceManagerLeaderID current leader id of the ResourceManager
    +	 */
    +	void requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID);
    --- End diff --
    
    As of now, this is just a stub but we will have to acknowledge the message. Will change the signature to make that clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77650313
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java ---
    @@ -15,11 +15,27 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +package org.apache.flink.runtime.highavailability;
     
    -package org.apache.flink.runtime.resourcemanager;
    +import java.util.UUID;
     
    -import java.io.Serializable;
    +/**
    + * Registry class to keep track of the current leader ID.
    + */
    +public class LeaderIdRegistry {
    --- End diff --
    
    Alright, but then this class should be made thread safe and the docs should state the purpose.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77523985
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---
    @@ -32,4 +33,11 @@
     	// ------------------------------------------------------------------------
     
     	void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId);
    +
    +	/**
    +	 * Send by the ResourceManager to the TaskExecutor
    +	 * @param allocationID id for the request
    +	 * @param resourceManagerLeaderID current leader id of the ResourceManager
    +	 */
    +	void requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID);
    --- End diff --
    
    How is the confirmation of the `TaskExecutor` sent back to the `SlotManager`? Would it make sense to send it back via the return value of this method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allocation ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2463
  
    CC @tillrohrmann @StephanEwen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r78212144
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) {
     	 * RPC's main thread to avoid race condition).
     	 *
     	 * @param request The detailed request of the slot
    +	 * @return SlotRequestRegistered The confirmation message to be send to the caller
     	 */
    -	public void requestSlot(final SlotRequest request) {
    +	public SlotRequestRegistered requestSlot(final SlotRequest request) {
    +		final AllocationID allocationId = request.getAllocationId();
     		if (isRequestDuplicated(request)) {
    -			LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId());
    -			return;
    +			LOG.warn("Duplicated slot request, AllocationID:{}", allocationId);
    +			return null;
     		}
     
     		// try to fulfil the request with current free slots
    -		ResourceSlot slot = chooseSlotToUse(request, freeSlots);
    +		final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
     		if (slot != null) {
     			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
    -				request.getAllocationId(), request.getJobId());
    +				allocationId, request.getJobId());
     
     			// record this allocation in bookkeeping
    -			allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId());
    +			allocationMap.addAllocation(slot.getSlotId(), allocationId);
     
     			// remove selected slot from free pool
     			freeSlots.remove(slot.getSlotId());
     
    -			// TODO: send slot request to TaskManager
    +			slot.getTaskExecutorGateway()
    +				.requestSlot(allocationId, leaderIdRegistry.getLeaderID());
    --- End diff --
    
    Thank you for your comments @beyond1920. Your observations are correct. I've skipped this part of the implementation and wanted to address it next.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77524626
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java ---
    @@ -15,11 +15,27 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +package org.apache.flink.runtime.highavailability;
     
    -package org.apache.flink.runtime.resourcemanager;
    +import java.util.UUID;
     
    -import java.io.Serializable;
    +/**
    + * Registry class to keep track of the current leader ID.
    + */
    +public class LeaderIdRegistry {
    --- End diff --
    
    Why do you create a registry for a single field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77523806
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.resourcemanager.slotmanager;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.SlotID;
    +import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
    +import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import org.apache.flink.runtime.resourcemanager.SlotRequest;
    +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.apache.flink.runtime.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Duration;
    +
    +import java.util.Collections;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.timeout;
    +import static org.mockito.Mockito.verify;
    +
    +public class SlotProtocolTest {
    +
    +	private static TestingRpcService testRpcService;
    +
    +	@BeforeClass
    +	public static void beforeClass() {
    +		testRpcService = new TestingRpcService();
    +
    +	}
    +
    +	@AfterClass
    +	public static void afterClass() {
    +		testRpcService.stopService();
    +		testRpcService = null;
    +	}
    +
    +	@Before
    +	public void beforeTest(){
    +		testRpcService.clearGateways();
    +	}
    +
    +	/**
    +	 * Tests whether
    +	 * 1) SlotRequest is routed to the SlotManager
    +	 * 2) SlotRequest leads to a container allocation
    +	 * 3) SlotRequest is confirmed
    +	 * 4) Slot becomes available and TaskExecutor gets a SlotRequest
    +	 */
    +	@Test
    +	public void testSlotsUnavailableRequest() throws Exception {
    +		final String rmAddress = "/rm1";
    +		final String jmAddress = "/jm1";
    +		final JobID jobID = new JobID();
    +
    +		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
    +
    +
    +		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
    +		ResourceManager resourceManager =
    +			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
    +		resourceManager.start();
    +
    +		Future<RegistrationResponse> registrationFuture =
    +			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
    +		try {
    +			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
    +		} catch (Exception e) {
    +			Assert.fail("JobManager registration Future didn't become ready.");
    +		}
    +
    +		final AllocationID allocationID = new AllocationID();
    +		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
    +
    +		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
    +		Future<SlotRequestRegistered> slotRequestFuture =
    +			resourceManager.getSelf().requestSlot(slotRequest);
    --- End diff --
    
    Why don't you call directly `requestSlot` on the `ResourceManager`? Additionally, I think you could use the `TestingSerialRpcService` which would allow you to get rid of all the asynchronous operations and waitings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77970827
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -52,15 +58,28 @@
      * </ul>
      */
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
    -	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +
    +	private final Logger LOG = LoggerFactory.getLogger(getClass());
    --- End diff --
    
    Good point. Will use that one instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77651201
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.resourcemanager.slotmanager;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.SlotID;
    +import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
    +import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import org.apache.flink.runtime.resourcemanager.SlotRequest;
    +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.apache.flink.runtime.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Duration;
    +
    +import java.util.Collections;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.timeout;
    +import static org.mockito.Mockito.verify;
    +
    +public class SlotProtocolTest {
    +
    +	private static TestingRpcService testRpcService;
    +
    +	@BeforeClass
    +	public static void beforeClass() {
    +		testRpcService = new TestingRpcService();
    +
    +	}
    +
    +	@AfterClass
    +	public static void afterClass() {
    +		testRpcService.stopService();
    +		testRpcService = null;
    +	}
    +
    +	@Before
    +	public void beforeTest(){
    +		testRpcService.clearGateways();
    +	}
    +
    +	/**
    +	 * Tests whether
    +	 * 1) SlotRequest is routed to the SlotManager
    +	 * 2) SlotRequest leads to a container allocation
    +	 * 3) SlotRequest is confirmed
    +	 * 4) Slot becomes available and TaskExecutor gets a SlotRequest
    +	 */
    +	@Test
    +	public void testSlotsUnavailableRequest() throws Exception {
    +		final String rmAddress = "/rm1";
    +		final String jmAddress = "/jm1";
    +		final JobID jobID = new JobID();
    +
    +		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
    +
    +
    +		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
    +		ResourceManager resourceManager =
    +			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
    +		resourceManager.start();
    +
    +		Future<RegistrationResponse> registrationFuture =
    +			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
    +		try {
    +			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
    +		} catch (Exception e) {
    +			Assert.fail("JobManager registration Future didn't become ready.");
    +		}
    +
    +		final AllocationID allocationID = new AllocationID();
    +		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
    +
    +		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
    +		Future<SlotRequestRegistered> slotRequestFuture =
    +			resourceManager.getSelf().requestSlot(slotRequest);
    --- End diff --
    
    But you're not relying on asynchronous execution in this test. I think these kind of unit test should be as light-weight as possible. Thus, one should not start actor systems if that's not really necessary. I think in this test, it is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allocation ...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2463
  
    Merged to continue development in other places. Thanks for the comments!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77943543
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -52,15 +58,28 @@
      * </ul>
      */
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
    -	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +
    +	private final Logger LOG = LoggerFactory.getLogger(getClass());
    --- End diff --
    
    There is a log field in RpcEndpoint, which is protected, why not use that instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77651399
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -52,15 +58,28 @@
      * </ul>
      */
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
    -	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +
    +	private final Logger LOG = LoggerFactory.getLogger(getClass());
    --- End diff --
    
    But then this field should probably marked as `protected` instead of `private`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r78194763
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java ---
    @@ -15,11 +15,27 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +package org.apache.flink.runtime.highavailability;
     
    -package org.apache.flink.runtime.resourcemanager;
    +import java.util.UUID;
     
    -import java.io.Serializable;
    +/**
    + * Registry class to keep track of the current leader ID.
    + */
    +public class LeaderIdRegistry {
    --- End diff --
    
    These are valid points, I will change the code to use the `LeaderRetrievalListener` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77628600
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -52,15 +58,28 @@
      * </ul>
      */
     public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
    -	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
    +
    +	private final Logger LOG = LoggerFactory.getLogger(getClass());
    --- End diff --
    
    No particular reason other than I want to make sure future subclasses log with the correct class name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77522737
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -131,9 +149,16 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public SlotRequestRegistered requestSlot(SlotRequest slotRequest) {
    +		final JobID jobId = slotRequest.getJobId();
    +		final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
    +
    +		if (jobMasterGateway != null) {
    +			return slotManager.requestSlot(slotRequest);
    +		} else {
    +			LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
    +			return null;
    --- End diff --
    
    Not sure whether we should return `null` here, a negative `SlotRequestRegistered` response or throw an exception which will be handled by the caller. Why did you choose `null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77630351
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.resourcemanager.slotmanager;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.SlotID;
    +import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
    +import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import org.apache.flink.runtime.resourcemanager.SlotRequest;
    +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.apache.flink.runtime.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Duration;
    +
    +import java.util.Collections;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.timeout;
    +import static org.mockito.Mockito.verify;
    +
    +public class SlotProtocolTest {
    +
    +	private static TestingRpcService testRpcService;
    +
    +	@BeforeClass
    +	public static void beforeClass() {
    +		testRpcService = new TestingRpcService();
    +
    +	}
    +
    +	@AfterClass
    +	public static void afterClass() {
    +		testRpcService.stopService();
    +		testRpcService = null;
    +	}
    +
    +	@Before
    +	public void beforeTest(){
    +		testRpcService.clearGateways();
    +	}
    +
    +	/**
    +	 * Tests whether
    +	 * 1) SlotRequest is routed to the SlotManager
    +	 * 2) SlotRequest leads to a container allocation
    +	 * 3) SlotRequest is confirmed
    +	 * 4) Slot becomes available and TaskExecutor gets a SlotRequest
    +	 */
    +	@Test
    +	public void testSlotsUnavailableRequest() throws Exception {
    +		final String rmAddress = "/rm1";
    +		final String jmAddress = "/jm1";
    +		final JobID jobID = new JobID();
    +
    +		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
    +
    +
    +		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
    +		ResourceManager resourceManager =
    +			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
    +		resourceManager.start();
    +
    +		Future<RegistrationResponse> registrationFuture =
    +			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
    +		try {
    +			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
    +		} catch (Exception e) {
    +			Assert.fail("JobManager registration Future didn't become ready.");
    +		}
    +
    +		final AllocationID allocationID = new AllocationID();
    +		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
    +
    +		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
    +		Future<SlotRequestRegistered> slotRequestFuture =
    +			resourceManager.getSelf().requestSlot(slotRequest);
    --- End diff --
    
    I suppose, I could do that. `TestingSerialRpcService` has its drawbacks (as of now) because it enforces a deterministic serial execution. This can potentially hide bugs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm closed the pull request at:

    https://github.com/apache/flink/pull/2463


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77629249
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -131,9 +149,16 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
     	 * @return Slot assignment
     	 */
     	@RpcMethod
    -	public SlotAssignment requestSlot(SlotRequest slotRequest) {
    -		System.out.println("SlotRequest: " + slotRequest);
    -		return new SlotAssignment();
    +	public SlotRequestRegistered requestSlot(SlotRequest slotRequest) {
    +		final JobID jobId = slotRequest.getJobId();
    +		final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
    +
    +		if (jobMasterGateway != null) {
    +			return slotManager.requestSlot(slotRequest);
    +		} else {
    +			LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
    +			return null;
    --- End diff --
    
    The rationale here was to simply ignore this request because the JobManager is not registered. You're right, probably better to reply with a meaningful answer. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77650617
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.resourcemanager.slotmanager;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.SlotID;
    +import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
    +import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import org.apache.flink.runtime.resourcemanager.SlotRequest;
    +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.apache.flink.runtime.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Duration;
    +
    +import java.util.Collections;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.timeout;
    +import static org.mockito.Mockito.verify;
    +
    +public class SlotProtocolTest {
    +
    +	private static TestingRpcService testRpcService;
    +
    +	@BeforeClass
    +	public static void beforeClass() {
    +		testRpcService = new TestingRpcService();
    +
    +	}
    +
    +	@AfterClass
    +	public static void afterClass() {
    +		testRpcService.stopService();
    +		testRpcService = null;
    +	}
    +
    +	@Before
    +	public void beforeTest(){
    +		testRpcService.clearGateways();
    +	}
    +
    +	/**
    +	 * Tests whether
    +	 * 1) SlotRequest is routed to the SlotManager
    +	 * 2) SlotRequest leads to a container allocation
    +	 * 3) SlotRequest is confirmed
    +	 * 4) Slot becomes available and TaskExecutor gets a SlotRequest
    +	 */
    +	@Test
    +	public void testSlotsUnavailableRequest() throws Exception {
    +		final String rmAddress = "/rm1";
    +		final String jmAddress = "/jm1";
    +		final JobID jobID = new JobID();
    +
    +		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
    +
    +
    +		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
    +		ResourceManager resourceManager =
    +			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
    +		resourceManager.start();
    +
    +		Future<RegistrationResponse> registrationFuture =
    +			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
    +		try {
    +			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
    +		} catch (Exception e) {
    +			Assert.fail("JobManager registration Future didn't become ready.");
    +		}
    +
    +		final AllocationID allocationID = new AllocationID();
    +		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
    +
    +		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
    +		Future<SlotRequestRegistered> slotRequestFuture =
    +			resourceManager.getSelf().requestSlot(slotRequest);
    +
    +		// 1) SlotRequest is routed to the SlotManager
    +		verify(slotManager, timeout(5000)).requestSlot(slotRequest);
    +
    +		// 2) SlotRequest leads to a container allocation
    +		verify(slotManager, timeout(5000)).allocateContainer(resourceProfile);
    +
    +		// 3) SlotRequest is confirmed
    +		Assert.assertEquals(
    +			Await.result(slotRequestFuture, Duration.create(5, TimeUnit.SECONDS)).getAllocationID(),
    +			allocationID);
    +
    +		Assert.assertFalse(slotManager.isAllocated(allocationID));
    +
    +		// slot becomes available
    +		final String tmAddress = "/tm1";
    +		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
    +
    +		final ResourceID resourceID = ResourceID.generate();
    +		final SlotID slotID = new SlotID(resourceID, 0);
    +
    +		final SlotStatus slotStatus =
    +			new SlotStatus(slotID, resourceProfile, taskExecutorGateway);
    +		final SlotReport slotReport =
    +			new SlotReport(Collections.singletonList(slotStatus), resourceID);
    +		// register slot at SlotManager
    +		slotManager.updateSlotStatus(slotReport);
    +
    +		// 4) Slot becomes available and TaskExecutor gets a SlotRequest
    +		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class));
    +	}
    +
    +	/**
    +	 * Tests whether
    +	 * 1) a SlotRequest is routed to the SlotManager
    +	 * 2) a SlotRequest leads to an allocation of a registered slot
    +	 * 3) a SlotRequest is confirmed
    +	 * 4) a SlotRequest is routed to the TaskExecutor
    +	 */
    +	@Test
    +	public void testSlotAvailableRequest() throws Exception {
    +		final String rmAddress = "/rm1";
    +		final String jmAddress = "/jm1";
    +		final String tmAddress = "/tm1";
    +		final JobID jobID = new JobID();
    +
    +		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
    +
    +		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
    +
    +		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
    +		ResourceManager resourceManager =
    +			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
    +		resourceManager.start();
    +
    +		Future<RegistrationResponse> registrationFuture =
    +			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
    +		try {
    +			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
    +		} catch (Exception e) {
    +			Assert.fail("JobManager registration Future didn't become ready.");
    +		}
    +
    +		final ResourceID resourceID = ResourceID.generate();
    +		final AllocationID allocationID = new AllocationID();
    +		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
    +		final SlotID slotID = new SlotID(resourceID, 0);
    +
    +		final SlotStatus slotStatus =
    +			new SlotStatus(slotID, resourceProfile, taskExecutorGateway);
    +		final SlotReport slotReport =
    +			new SlotReport(Collections.singletonList(slotStatus), resourceID);
    +		// register slot at SlotManager
    +		slotManager.updateSlotStatus(slotReport);
    +
    +		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
    +		Future<SlotRequestRegistered> slotRequestFuture =
    +			resourceManager.getSelf().requestSlot(slotRequest);
    +
    +		// 1) a SlotRequest is routed to the SlotManager
    +		verify(slotManager, timeout(5000)).requestSlot(slotRequest);
    +
    +		// 2) a SlotRequest leads to an allocation of a registered slot
    +		Assert.assertTrue(slotManager.isAllocated(slotID));
    +		Assert.assertTrue(slotManager.isAllocated(allocationID));
    +
    +		// 3) a SlotRequest is confirmed
    +		Assert.assertEquals(
    +			Await.result(slotRequestFuture, Duration.create(5, TimeUnit.SECONDS)).getAllocationID(),
    +			allocationID);
    +
    +		// 4) a SlotRequest is routed to the TaskExecutor
    +		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class));
    --- End diff --
    
    Of course, you're waiting here. You wait until either the verify times out or the asynchronous call has been completed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77783497
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java ---
    @@ -15,11 +15,27 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +package org.apache.flink.runtime.highavailability;
     
    -package org.apache.flink.runtime.resourcemanager;
    +import java.util.UUID;
     
    -import java.io.Serializable;
    +/**
    + * Registry class to keep track of the current leader ID.
    + */
    +public class LeaderIdRegistry {
    --- End diff --
    
    What exactly do you mean? The class is thread-safe and documented (though documentation can be improved). There is no need for locking. Do you mean marking the leaderSessionID `volatile`? It should be fine if leader changes propagate lazily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77769044
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---
    @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) {
     	 * RPC's main thread to avoid race condition).
     	 *
     	 * @param request The detailed request of the slot
    +	 * @return SlotRequestRegistered The confirmation message to be send to the caller
     	 */
    -	public void requestSlot(final SlotRequest request) {
    +	public SlotRequestRegistered requestSlot(final SlotRequest request) {
    +		final AllocationID allocationId = request.getAllocationId();
     		if (isRequestDuplicated(request)) {
    -			LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId());
    -			return;
    +			LOG.warn("Duplicated slot request, AllocationID:{}", allocationId);
    +			return null;
     		}
     
     		// try to fulfil the request with current free slots
    -		ResourceSlot slot = chooseSlotToUse(request, freeSlots);
    +		final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
     		if (slot != null) {
     			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
    -				request.getAllocationId(), request.getJobId());
    +				allocationId, request.getJobId());
     
     			// record this allocation in bookkeeping
    -			allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId());
    +			allocationMap.addAllocation(slot.getSlotId(), allocationId);
     
     			// remove selected slot from free pool
     			freeSlots.remove(slot.getSlotId());
     
    -			// TODO: send slot request to TaskManager
    +			slot.getTaskExecutorGateway()
    +				.requestSlot(allocationId, leaderIdRegistry.getLeaderID());
    --- End diff --
    
    There exists 3 following possibilities of the response from taskExecutor:
    1. Ack request which means the taskExecutor gives the slot to the specified jobMaster as expected. 
    2. Decline request if the slot is already occupied by other AllocationID. 
    3. Timeout which could caused by lost of request message or response message or slow network transfer. 
    On the first occasion, SlotManager need to do nothing. However, under the second and third occasion, slotManager will verify and clear all the previous allocate information for this slot request firstly, then try to find a proper slot for the slot request again. I thought we should add logic to handle these 3 following possibilities of the response from taskExecutor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77524239
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java ---
    @@ -46,13 +47,21 @@
     	/** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */
     	private final JobID jobID;
     
    -	public SlotStatus(SlotID slotID, ResourceProfile profiler) {
    -		this(slotID, profiler, null, null);
    +	/** Gateway to the TaskManager which reported the SlotStatus */
    +	private final TaskExecutorGateway taskExecutorGateway;
    --- End diff --
    
    The `SlotStatus` is no longer serializable with this field. Where does the `SlotStatus` come from? If it's coming from the `TaskExecutor`, then the `taskExecutorGateway` has to be retrieved on the `ResourceManager` side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77630918
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.resourcemanager.slotmanager;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.SlotID;
    +import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
    +import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import org.apache.flink.runtime.resourcemanager.SlotRequest;
    +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.apache.flink.runtime.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Duration;
    +
    +import java.util.Collections;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.timeout;
    +import static org.mockito.Mockito.verify;
    +
    +public class SlotProtocolTest {
    +
    +	private static TestingRpcService testRpcService;
    +
    +	@BeforeClass
    +	public static void beforeClass() {
    +		testRpcService = new TestingRpcService();
    +
    +	}
    +
    +	@AfterClass
    +	public static void afterClass() {
    +		testRpcService.stopService();
    +		testRpcService = null;
    +	}
    +
    +	@Before
    +	public void beforeTest(){
    +		testRpcService.clearGateways();
    +	}
    +
    +	/**
    +	 * Tests whether
    +	 * 1) SlotRequest is routed to the SlotManager
    +	 * 2) SlotRequest leads to a container allocation
    +	 * 3) SlotRequest is confirmed
    +	 * 4) Slot becomes available and TaskExecutor gets a SlotRequest
    +	 */
    +	@Test
    +	public void testSlotsUnavailableRequest() throws Exception {
    +		final String rmAddress = "/rm1";
    +		final String jmAddress = "/jm1";
    +		final JobID jobID = new JobID();
    +
    +		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
    +
    +
    +		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
    +		ResourceManager resourceManager =
    +			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
    +		resourceManager.start();
    +
    +		Future<RegistrationResponse> registrationFuture =
    +			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
    +		try {
    +			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
    +		} catch (Exception e) {
    +			Assert.fail("JobManager registration Future didn't become ready.");
    +		}
    +
    +		final AllocationID allocationID = new AllocationID();
    +		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
    +
    +		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
    +		Future<SlotRequestRegistered> slotRequestFuture =
    +			resourceManager.getSelf().requestSlot(slotRequest);
    +
    +		// 1) SlotRequest is routed to the SlotManager
    +		verify(slotManager, timeout(5000)).requestSlot(slotRequest);
    +
    +		// 2) SlotRequest leads to a container allocation
    +		verify(slotManager, timeout(5000)).allocateContainer(resourceProfile);
    +
    +		// 3) SlotRequest is confirmed
    +		Assert.assertEquals(
    +			Await.result(slotRequestFuture, Duration.create(5, TimeUnit.SECONDS)).getAllocationID(),
    +			allocationID);
    +
    +		Assert.assertFalse(slotManager.isAllocated(allocationID));
    +
    +		// slot becomes available
    +		final String tmAddress = "/tm1";
    +		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
    +
    +		final ResourceID resourceID = ResourceID.generate();
    +		final SlotID slotID = new SlotID(resourceID, 0);
    +
    +		final SlotStatus slotStatus =
    +			new SlotStatus(slotID, resourceProfile, taskExecutorGateway);
    +		final SlotReport slotReport =
    +			new SlotReport(Collections.singletonList(slotStatus), resourceID);
    +		// register slot at SlotManager
    +		slotManager.updateSlotStatus(slotReport);
    +
    +		// 4) Slot becomes available and TaskExecutor gets a SlotRequest
    +		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class));
    +	}
    +
    +	/**
    +	 * Tests whether
    +	 * 1) a SlotRequest is routed to the SlotManager
    +	 * 2) a SlotRequest leads to an allocation of a registered slot
    +	 * 3) a SlotRequest is confirmed
    +	 * 4) a SlotRequest is routed to the TaskExecutor
    +	 */
    +	@Test
    +	public void testSlotAvailableRequest() throws Exception {
    +		final String rmAddress = "/rm1";
    +		final String jmAddress = "/jm1";
    +		final String tmAddress = "/tm1";
    +		final JobID jobID = new JobID();
    +
    +		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
    +
    +		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
    +
    +		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
    +		ResourceManager resourceManager =
    +			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
    +		resourceManager.start();
    +
    +		Future<RegistrationResponse> registrationFuture =
    +			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
    +		try {
    +			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
    +		} catch (Exception e) {
    +			Assert.fail("JobManager registration Future didn't become ready.");
    +		}
    +
    +		final ResourceID resourceID = ResourceID.generate();
    +		final AllocationID allocationID = new AllocationID();
    +		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
    +		final SlotID slotID = new SlotID(resourceID, 0);
    +
    +		final SlotStatus slotStatus =
    +			new SlotStatus(slotID, resourceProfile, taskExecutorGateway);
    +		final SlotReport slotReport =
    +			new SlotReport(Collections.singletonList(slotStatus), resourceID);
    +		// register slot at SlotManager
    +		slotManager.updateSlotStatus(slotReport);
    +
    +		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
    +		Future<SlotRequestRegistered> slotRequestFuture =
    +			resourceManager.getSelf().requestSlot(slotRequest);
    +
    +		// 1) a SlotRequest is routed to the SlotManager
    +		verify(slotManager, timeout(5000)).requestSlot(slotRequest);
    +
    +		// 2) a SlotRequest leads to an allocation of a registered slot
    +		Assert.assertTrue(slotManager.isAllocated(slotID));
    +		Assert.assertTrue(slotManager.isAllocated(allocationID));
    +
    +		// 3) a SlotRequest is confirmed
    +		Assert.assertEquals(
    +			Await.result(slotRequestFuture, Duration.create(5, TimeUnit.SECONDS)).getAllocationID(),
    +			allocationID);
    +
    +		// 4) a SlotRequest is routed to the TaskExecutor
    +		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class));
    --- End diff --
    
    We're not really waiting here but executing asynchronous. The whole test completes in less than 250 milliseconds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77650487
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.resourcemanager.slotmanager;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.SlotID;
    +import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
    +import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import org.apache.flink.runtime.resourcemanager.SlotRequest;
    +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.apache.flink.runtime.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Duration;
    +
    +import java.util.Collections;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.timeout;
    +import static org.mockito.Mockito.verify;
    +
    +public class SlotProtocolTest {
    --- End diff --
    
    Should extend `TestLogger`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77629858
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java ---
    @@ -46,13 +47,21 @@
     	/** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */
     	private final JobID jobID;
     
    -	public SlotStatus(SlotID slotID, ResourceProfile profiler) {
    -		this(slotID, profiler, null, null);
    +	/** Gateway to the TaskManager which reported the SlotStatus */
    +	private final TaskExecutorGateway taskExecutorGateway;
    --- End diff --
    
    It comes with the SlotReport from the TaskExecutor. Yes, it breaks Serializable. Will change the code to contain the String address instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77628362
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java ---
    @@ -15,11 +15,27 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +package org.apache.flink.runtime.highavailability;
     
    -package org.apache.flink.runtime.resourcemanager;
    +import java.util.UUID;
     
    -import java.io.Serializable;
    +/**
    + * Registry class to keep track of the current leader ID.
    + */
    +public class LeaderIdRegistry {
    --- End diff --
    
    In order to pass it on to components who want to retrieve the current leader UUID. Passing on only a single reference wouldn't work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2463#discussion_r77524466
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.resourcemanager.slotmanager;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.AllocationID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.clusterframework.types.SlotID;
    +import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
    +import org.apache.flink.runtime.highavailability.NonHaServices;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
    +import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import org.apache.flink.runtime.resourcemanager.SlotRequest;
    +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.apache.flink.runtime.taskexecutor.SlotReport;
    +import org.apache.flink.runtime.taskexecutor.SlotStatus;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutor;
    +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Duration;
    +
    +import java.util.Collections;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.timeout;
    +import static org.mockito.Mockito.verify;
    +
    +public class SlotProtocolTest {
    +
    +	private static TestingRpcService testRpcService;
    +
    +	@BeforeClass
    +	public static void beforeClass() {
    +		testRpcService = new TestingRpcService();
    +
    +	}
    +
    +	@AfterClass
    +	public static void afterClass() {
    +		testRpcService.stopService();
    +		testRpcService = null;
    +	}
    +
    +	@Before
    +	public void beforeTest(){
    +		testRpcService.clearGateways();
    +	}
    +
    +	/**
    +	 * Tests whether
    +	 * 1) SlotRequest is routed to the SlotManager
    +	 * 2) SlotRequest leads to a container allocation
    +	 * 3) SlotRequest is confirmed
    +	 * 4) Slot becomes available and TaskExecutor gets a SlotRequest
    +	 */
    +	@Test
    +	public void testSlotsUnavailableRequest() throws Exception {
    +		final String rmAddress = "/rm1";
    +		final String jmAddress = "/jm1";
    +		final JobID jobID = new JobID();
    +
    +		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
    +
    +
    +		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
    +		ResourceManager resourceManager =
    +			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
    +		resourceManager.start();
    +
    +		Future<RegistrationResponse> registrationFuture =
    +			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
    +		try {
    +			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
    +		} catch (Exception e) {
    +			Assert.fail("JobManager registration Future didn't become ready.");
    +		}
    +
    +		final AllocationID allocationID = new AllocationID();
    +		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
    +
    +		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
    +		Future<SlotRequestRegistered> slotRequestFuture =
    +			resourceManager.getSelf().requestSlot(slotRequest);
    +
    +		// 1) SlotRequest is routed to the SlotManager
    +		verify(slotManager, timeout(5000)).requestSlot(slotRequest);
    +
    +		// 2) SlotRequest leads to a container allocation
    +		verify(slotManager, timeout(5000)).allocateContainer(resourceProfile);
    +
    +		// 3) SlotRequest is confirmed
    +		Assert.assertEquals(
    +			Await.result(slotRequestFuture, Duration.create(5, TimeUnit.SECONDS)).getAllocationID(),
    +			allocationID);
    +
    +		Assert.assertFalse(slotManager.isAllocated(allocationID));
    +
    +		// slot becomes available
    +		final String tmAddress = "/tm1";
    +		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
    +
    +		final ResourceID resourceID = ResourceID.generate();
    +		final SlotID slotID = new SlotID(resourceID, 0);
    +
    +		final SlotStatus slotStatus =
    +			new SlotStatus(slotID, resourceProfile, taskExecutorGateway);
    +		final SlotReport slotReport =
    +			new SlotReport(Collections.singletonList(slotStatus), resourceID);
    +		// register slot at SlotManager
    +		slotManager.updateSlotStatus(slotReport);
    +
    +		// 4) Slot becomes available and TaskExecutor gets a SlotRequest
    +		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class));
    +	}
    +
    +	/**
    +	 * Tests whether
    +	 * 1) a SlotRequest is routed to the SlotManager
    +	 * 2) a SlotRequest leads to an allocation of a registered slot
    +	 * 3) a SlotRequest is confirmed
    +	 * 4) a SlotRequest is routed to the TaskExecutor
    +	 */
    +	@Test
    +	public void testSlotAvailableRequest() throws Exception {
    +		final String rmAddress = "/rm1";
    +		final String jmAddress = "/jm1";
    +		final String tmAddress = "/tm1";
    +		final JobID jobID = new JobID();
    +
    +		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
    +
    +		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
    +		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
    +
    +		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
    +		ResourceManager resourceManager =
    +			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
    +		resourceManager.start();
    +
    +		Future<RegistrationResponse> registrationFuture =
    +			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
    +		try {
    +			Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS));
    +		} catch (Exception e) {
    +			Assert.fail("JobManager registration Future didn't become ready.");
    +		}
    +
    +		final ResourceID resourceID = ResourceID.generate();
    +		final AllocationID allocationID = new AllocationID();
    +		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
    +		final SlotID slotID = new SlotID(resourceID, 0);
    +
    +		final SlotStatus slotStatus =
    +			new SlotStatus(slotID, resourceProfile, taskExecutorGateway);
    +		final SlotReport slotReport =
    +			new SlotReport(Collections.singletonList(slotStatus), resourceID);
    +		// register slot at SlotManager
    +		slotManager.updateSlotStatus(slotReport);
    +
    +		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
    +		Future<SlotRequestRegistered> slotRequestFuture =
    +			resourceManager.getSelf().requestSlot(slotRequest);
    +
    +		// 1) a SlotRequest is routed to the SlotManager
    +		verify(slotManager, timeout(5000)).requestSlot(slotRequest);
    +
    +		// 2) a SlotRequest leads to an allocation of a registered slot
    +		Assert.assertTrue(slotManager.isAllocated(slotID));
    +		Assert.assertTrue(slotManager.isAllocated(allocationID));
    +
    +		// 3) a SlotRequest is confirmed
    +		Assert.assertEquals(
    +			Await.result(slotRequestFuture, Duration.create(5, TimeUnit.SECONDS)).getAllocationID(),
    +			allocationID);
    +
    +		// 4) a SlotRequest is routed to the TaskExecutor
    +		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class));
    --- End diff --
    
    I think the test should also work if you use the `TestingSerialRpcService` which has the advantage to get rid of waiting. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---