You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:11 UTC

[04/50] [abbrv] flink git commit: [FLINK-5810] [flip-6] Introduce a hardened slot manager

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 579ca3a..6a0bd87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -47,13 +47,13 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.hamcrest.Matchers;
@@ -102,7 +102,6 @@ public class TaskExecutorITCase extends TestLogger {
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.milliseconds(500L),
 			Time.milliseconds(500L));
-		SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
 		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
 			testingHAServices,
 			rpcService.getScheduledExecutor(),
@@ -121,6 +120,11 @@ public class TaskExecutorITCase extends TestLogger {
 		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService, 100L));
 		final JobManagerTable jobManagerTable = new JobManagerTable();
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
+		final SlotManager slotManager = new SlotManager(
+			rpcService.getScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
 
 		ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
 			rpcService,
@@ -129,7 +133,7 @@ public class TaskExecutorITCase extends TestLogger {
 			resourceManagerConfiguration,
 			testingHAServices,
 			heartbeatServices,
-			slotManagerFactory,
+			slotManager,
 			metricRegistry,
 			jobLeaderIdService,
 			testingFatalErrorHandler);
@@ -168,7 +172,7 @@ public class TaskExecutorITCase extends TestLogger {
 		rpcService.registerGateway(jmAddress, jmGateway);
 
 		final AllocationID allocationId = new AllocationID();
-		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile);
+		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, jmAddress);
 		final SlotOffer slotOffer = new SlotOffer(allocationId, 0, resourceProfile);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 1d1840e..d3d4d43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -65,10 +65,8 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -101,6 +99,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.*;
 
 public class TaskExecutorTest extends TestLogger {
@@ -703,10 +702,7 @@ public class TaskExecutorTest extends TestLogger {
 			resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerLeaderId);
 
 			// request slots from the task manager under the given allocation id
-			TMSlotRequestReply reply = taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId);
-
-			// this is hopefully successful :-)
-			assertTrue(reply instanceof TMSlotRequestRegistered);
+			taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId);
 
 			// now inform the task manager about the new job leader
 			jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId);
@@ -820,7 +816,11 @@ public class TaskExecutorTest extends TestLogger {
 			// been properly started.
 			jobLeaderService.addJob(jobId, jobManagerAddress);
 
-			verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1)));
+			verify(resourceManagerGateway).notifySlotAvailable(
+				eq(resourceManagerLeaderId),
+				eq(registrationId),
+				eq(new SlotID(resourceId, 1)),
+				eq(allocationId2));
 
 			assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
 			assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
@@ -903,15 +903,19 @@ public class TaskExecutorTest extends TestLogger {
 
 			// test that allocating a slot works
 			final SlotID slotID = new SlotID(resourceID, 0);
-			TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
-			assertTrue(tmSlotRequestReply instanceof TMSlotRequestRegistered);
+			taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
 
 			// TODO: Figure out the concrete allocation behaviour between RM and TM. Maybe we don't need the SlotID...
 			// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
 			final SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1);
-			TMSlotRequestReply tmSlotRequestReply2 =
+
+			try {
 				taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
-			assertTrue(tmSlotRequestReply2 instanceof TMSlotRequestRejected);
+
+				fail("The slot request should have failed.");
+			} catch (SlotAllocationException e) {
+				// expected
+			}
 
 			// re-register
 			verify(rmGateway1).registerTaskExecutor(
@@ -920,9 +924,7 @@ public class TaskExecutorTest extends TestLogger {
 
 			// now we should be successful because the slots status has been synced
 			// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
-			TMSlotRequestReply tmSlotRequestReply3 =
-				taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
-			assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered);
+			taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
@@ -1100,7 +1102,11 @@ public class TaskExecutorTest extends TestLogger {
 			// acknowledge the offered slots
 			offerResultFuture.complete(Collections.singleton(offer1));
 
-			verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1)));
+			verify(resourceManagerGateway).notifySlotAvailable(
+				eq(resourceManagerLeaderId),
+				eq(registrationId),
+				eq(new SlotID(resourceId, 1)),
+				any(AllocationID.class));
 
 			assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
 			assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index f58faf2..03b5172 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,11 +28,13 @@ import com.google.common.util.concurrent.MoreExecutors
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions, TaskManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.concurrent.{ScheduledExecutor, ScheduledExecutorServiceAdapter}
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
@@ -78,6 +80,10 @@ object TestingUtils {
   }
 
   def getDefaultTestingActorSystemConfig = testConfig
+
+  def infiniteTime: Time = {
+    Time.milliseconds(Long.MaxValue);
+  }
   
 
   def startTestingCluster(numSlots: Int, numTMs: Int = 1,
@@ -114,6 +120,12 @@ object TestingUtils {
     }
   }
 
+  def defaultScheduledExecutor: ScheduledExecutor = {
+    val scheduledExecutorService = defaultExecutor
+
+    new ScheduledExecutorServiceAdapter(scheduledExecutorService)
+  }
+
   /** Returns an [[ExecutionContext]] which uses the current thread to execute the runnable.
     *
     * @return Direct [[ExecutionContext]] which executes runnables directly

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 65d12b5..a6b66d7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -208,7 +208,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 			resourceManagerConfiguration,
 			haServices,
 			heartbeatServices,
-			resourceManagerRuntimeServices.getSlotManagerFactory(),
+			resourceManagerRuntimeServices.getSlotManager(),
 			metricRegistry,
 			resourceManagerRuntimeServices.getJobLeaderIdService(),
 			this);

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 74359c8..63e6a4c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -28,12 +28,13 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -115,7 +116,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
-			SlotManagerFactory slotManagerFactory,
+			SlotManager slotManager,
 			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
@@ -126,7 +127,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 			resourceManagerConfiguration,
 			highAvailabilityServices,
 			heartbeatServices,
-			slotManagerFactory,
+			slotManager,
 			metricRegistry,
 			jobLeaderIdService,
 			fatalErrorHandler);
@@ -223,6 +224,11 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	}
 
 	@Override
+	public void stopWorker(InstanceID instanceId) {
+		// TODO: Implement to stop the worker
+	}
+
+	@Override
 	protected ResourceID workerStarted(ResourceID resourceID) {
 		return resourceID;
 	}