You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:11 UTC
[04/50] [abbrv] flink git commit: [FLINK-5810] [flip-6] Introduce a
hardened slot manager
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 579ca3a..6a0bd87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -47,13 +47,13 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
@@ -102,7 +102,6 @@ public class TaskExecutorITCase extends TestLogger {
ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
Time.milliseconds(500L),
Time.milliseconds(500L));
- SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
testingHAServices,
rpcService.getScheduledExecutor(),
@@ -121,6 +120,11 @@ public class TaskExecutorITCase extends TestLogger {
final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService, 100L));
final JobManagerTable jobManagerTable = new JobManagerTable();
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
+ final SlotManager slotManager = new SlotManager(
+ rpcService.getScheduledExecutor(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
rpcService,
@@ -129,7 +133,7 @@ public class TaskExecutorITCase extends TestLogger {
resourceManagerConfiguration,
testingHAServices,
heartbeatServices,
- slotManagerFactory,
+ slotManager,
metricRegistry,
jobLeaderIdService,
testingFatalErrorHandler);
@@ -168,7 +172,7 @@ public class TaskExecutorITCase extends TestLogger {
rpcService.registerGateway(jmAddress, jmGateway);
final AllocationID allocationId = new AllocationID();
- final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile);
+ final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, jmAddress);
final SlotOffer slotOffer = new SlotOffer(allocationId, 0, resourceProfile);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 1d1840e..d3d4d43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -65,10 +65,8 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -101,6 +99,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
public class TaskExecutorTest extends TestLogger {
@@ -703,10 +702,7 @@ public class TaskExecutorTest extends TestLogger {
resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerLeaderId);
// request slots from the task manager under the given allocation id
- TMSlotRequestReply reply = taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId);
-
- // this is hopefully successful :-)
- assertTrue(reply instanceof TMSlotRequestRegistered);
+ taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId);
// now inform the task manager about the new job leader
jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId);
@@ -820,7 +816,11 @@ public class TaskExecutorTest extends TestLogger {
// been properly started.
jobLeaderService.addJob(jobId, jobManagerAddress);
- verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1)));
+ verify(resourceManagerGateway).notifySlotAvailable(
+ eq(resourceManagerLeaderId),
+ eq(registrationId),
+ eq(new SlotID(resourceId, 1)),
+ eq(allocationId2));
assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
@@ -903,15 +903,19 @@ public class TaskExecutorTest extends TestLogger {
// test that allocating a slot works
final SlotID slotID = new SlotID(resourceID, 0);
- TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
- assertTrue(tmSlotRequestReply instanceof TMSlotRequestRegistered);
+ taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
// TODO: Figure out the concrete allocation behaviour between RM and TM. Maybe we don't need the SlotID...
// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
final SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1);
- TMSlotRequestReply tmSlotRequestReply2 =
+
+ try {
taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
- assertTrue(tmSlotRequestReply2 instanceof TMSlotRequestRejected);
+
+ fail("The slot request should have failed.");
+ } catch (SlotAllocationException e) {
+ // expected
+ }
// re-register
verify(rmGateway1).registerTaskExecutor(
@@ -920,9 +924,7 @@ public class TaskExecutorTest extends TestLogger {
// now we should be successful because the slots status has been synced
// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
- TMSlotRequestReply tmSlotRequestReply3 =
- taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
- assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered);
+ taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
// check if a concurrent error occurred
testingFatalErrorHandler.rethrowError();
@@ -1100,7 +1102,11 @@ public class TaskExecutorTest extends TestLogger {
// acknowledge the offered slots
offerResultFuture.complete(Collections.singleton(offer1));
- verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1)));
+ verify(resourceManagerGateway).notifySlotAvailable(
+ eq(resourceManagerLeaderId),
+ eq(registrationId),
+ eq(new SlotID(resourceId, 1)),
+ any(AllocationID.class));
assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index f58faf2..03b5172 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,11 +28,13 @@ import com.google.common.util.concurrent.MoreExecutors
import com.typesafe.config.ConfigFactory
import grizzled.slf4j.Logger
import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions, TaskManagerOptions}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.JobClient
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.concurrent.{ScheduledExecutor, ScheduledExecutorServiceAdapter}
import org.apache.flink.runtime.highavailability.HighAvailabilityServices
import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
import org.apache.flink.runtime.jobgraph.JobGraph
@@ -78,6 +80,10 @@ object TestingUtils {
}
def getDefaultTestingActorSystemConfig = testConfig
+
+ def infiniteTime: Time = {
+ Time.milliseconds(Long.MaxValue);
+ }
def startTestingCluster(numSlots: Int, numTMs: Int = 1,
@@ -114,6 +120,12 @@ object TestingUtils {
}
}
+ def defaultScheduledExecutor: ScheduledExecutor = {
+ val scheduledExecutorService = defaultExecutor
+
+ new ScheduledExecutorServiceAdapter(scheduledExecutorService)
+ }
+
/** Returns an [[ExecutionContext]] which uses the current thread to execute the runnable.
*
* @return Direct [[ExecutionContext]] which executes runnables directly
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 65d12b5..a6b66d7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -208,7 +208,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
resourceManagerConfiguration,
haServices,
heartbeatServices,
- resourceManagerRuntimeServices.getSlotManagerFactory(),
+ resourceManagerRuntimeServices.getSlotManager(),
metricRegistry,
resourceManagerRuntimeServices.getJobLeaderIdService(),
this);
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 74359c8..63e6a4c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -28,12 +28,13 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -115,7 +116,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
- SlotManagerFactory slotManagerFactory,
+ SlotManager slotManager,
MetricRegistry metricRegistry,
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
@@ -126,7 +127,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
- slotManagerFactory,
+ slotManager,
metricRegistry,
jobLeaderIdService,
fatalErrorHandler);
@@ -223,6 +224,11 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
}
@Override
+ public void stopWorker(InstanceID instanceId) {
+ // TODO: Implement to stop the worker
+ }
+
+ @Override
protected ResourceID workerStarted(ResourceID resourceID) {
return resourceID;
}