You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@myriad.apache.org by da...@apache.org on 2016/08/30 17:18:22 UTC
[3/3] incubator-myriad git commit: Implementation of MYRIAD-229,
MYRIAD-237, MYRIAD-238,
MYRIAD-225 JIRA: [MYRIAD-225] https://issues.apache.org/jira/browse/MYRIAD-225
[MYRIAD-229] https://issues.apache.org/jira/browse/MYRIAD-239 [MYRIAD
Implementation of MYRIAD-229, MYRIAD-237, MYRIAD-238, MYRIAD-225
JIRA:
[MYRIAD-225] https://issues.apache.org/jira/browse/MYRIAD-225
[MYRIAD-229] https://issues.apache.org/jira/browse/MYRIAD-239
[MYRIAD-237] https://issues.apache.org/jira/browse/MYRIAD-237
[MYRIAD-237] https://issues.apache.org/jira/browse/MYRIAD-237
Pull Request:
Closes #91
Author: hokiegeek2 <ho...@gmail.com>
Date: Wed Aug 17 15:21:56 2016 -0400
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/577c30b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/577c30b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/577c30b1
Branch: refs/heads/master
Commit: 577c30b1abdaa30313251c887c014aebac3fd93c
Parents: 7aea259
Author: hokiegeek2 <ho...@gmail.com>
Authored: Wed Aug 17 15:21:56 2016 -0400
Committer: darinj <da...@apache.org>
Committed: Tue Aug 30 13:08:36 2016 -0400
----------------------------------------------------------------------
build.gradle | 1 -
gradle/spock.gradle | 46 ---
.../src/main/java/org/apache/myriad/Main.java | 18 +-
.../java/org/apache/myriad/MyriadModule.java | 2 +-
.../myriad/api/SchedulerStateResource.java | 2 +-
.../configuration/NodeManagerConfiguration.java | 18 +-
.../myriad/scheduler/MyriadOperations.java | 9 +-
.../org/apache/myriad/scheduler/Rebalancer.java | 4 +-
.../apache/myriad/scheduler/SchedulerUtils.java | 2 +-
.../apache/myriad/scheduler/TaskFactory.java | 2 +-
.../apache/myriad/scheduler/TaskTerminator.java | 4 +-
.../org/apache/myriad/scheduler/TaskUtils.java | 8 +-
.../handlers/StatusUpdateEventHandler.java | 2 +-
.../scheduler/fgs/NMHeartBeatHandler.java | 67 +++--
.../scheduler/fgs/OfferLifecycleManager.java | 67 +++--
.../scheduler/fgs/YarnNodeCapacityManager.java | 10 +-
.../scheduler/yarn/MyriadFairScheduler.java | 5 +
.../org/apache/myriad/state/SchedulerState.java | 14 +-
.../MyriadFileSystemRMStateStoreTest.java | 22 +-
.../org/apache/myriad/BaseConfigurableTest.java | 77 ++++-
.../org/apache/myriad/MyriadTestModule.java | 32 +-
.../org/apache/myriad/TestObjectFactory.java | 290 +++++++++++++------
.../myriad/api/ArtifactsResourceTest.java | 17 ++
.../myriad/api/SchedulerStateResourceTest.java | 17 ++
.../configuration/MyriadConfigurationTest.java | 5 +-
.../myriad/health/HealthCheckUtilsTest.java | 17 ++
.../health/MesosDriverHealthCheckTest.java | 17 ++
.../myriad/scheduler/MyriadOperationsTest.java | 97 ++++---
.../myriad/scheduler/SchedulerUtilsSpec.groovy | 90 ------
.../myriad/scheduler/SchedulerUtilsTest.java | 89 ++++++
.../apache/myriad/scheduler/TestTaskUtils.java | 3 +-
.../constraints/LikeConstraintSpec.groovy | 93 ------
.../constraints/LikeConstraintTest.java | 86 ++++++
.../myriad/scheduler/fgs/FGSTestBaseSpec.groovy | 175 -----------
.../scheduler/fgs/NMHeartBeatHandlerSpec.groovy | 114 --------
.../scheduler/fgs/NMHeartBeatHandlerTest.java | 240 +++++++++++++++
.../myriad/scheduler/fgs/NodeStoreTest.java | 23 +-
.../apache/myriad/scheduler/fgs/NodeTest.java | 89 ++++++
.../fgs/OfferLifeCycleManagerTest.java | 22 +-
.../fgs/YarnNodeCapacityManagerSpec.groovy | 143 ---------
.../fgs/YarnNodeCapacityManagerTest.java | 149 ++++++----
.../org/apache/myriad/state/ClusterTest.java | 17 ++
.../org/apache/myriad/state/MockDispatcher.java | 28 +-
.../org/apache/myriad/state/MockFuture.java | 17 ++
.../java/org/apache/myriad/state/MockRMApp.java | 17 ++
.../org/apache/myriad/state/MockRMContext.java | 25 +-
.../org/apache/myriad/state/MockRMNode.java | 17 ++
.../java/org/apache/myriad/state/MockState.java | 17 ++
.../org/apache/myriad/state/MockVariable.java | 17 ++
.../apache/myriad/state/MyriadStateTest.java | 17 ++
.../org/apache/myriad/state/NodeTaskTest.java | 17 ++
.../apache/myriad/state/SchedulerStateTest.java | 159 +++++++---
.../state/utils/ByteBufferSupportTest.java | 17 ++
.../webapp/HttpConnectorProviderTest.java | 17 ++
.../myriad/webapp/MyriadWebServerTest.java | 17 ++
.../resources/myriad-config-test-default.yml | 39 +--
56 files changed, 1590 insertions(+), 1036 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index b48cfc9..a551ba0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -57,7 +57,6 @@ subprojects {
apply plugin: 'java'
apply plugin: 'application'
- apply from: "$rootDir/gradle/spock.gradle"
apply from: "$rootDir/gradle/quality.gradle"
sourceCompatibility = '1.7'
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/gradle/spock.gradle
----------------------------------------------------------------------
diff --git a/gradle/spock.gradle b/gradle/spock.gradle
deleted file mode 100644
index fc974ec..0000000
--- a/gradle/spock.gradle
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-// used for unit tests
-apply plugin: 'groovy'
-
-def spockVersion = '1.0-groovy-2.4'
-def powermockVersion = "1.6.1"
-
-dependencies {
-
- testCompile "org.codehaus.groovy:groovy-all:2.4.1"
- testCompile "org.spockframework:spock-core:$spockVersion"
-
- testCompile 'cglib:cglib-nodep:2.2.2' // need to mock classes
-
- // useful to mock out statics and final classes in Java.
- testCompile "org.powermock:powermock-module-junit4:$powermockVersion"
- testCompile "org.powermock:powermock-module-junit4-rule:$powermockVersion"
- testCompile "org.powermock:powermock-classloading-xstream:$powermockVersion"
- testCompile "org.powermock:powermock-api-mockito:$powermockVersion"
-}
-
-// for spock to live in test java tree
-sourceSets {
- test {
- groovy { srcDir 'src/test/java' }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
index 8c028f1..463c4c5 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
@@ -79,18 +79,6 @@ public class Main {
private static Injector injector;
- /**
- * Main is the bootstrap class for the Myriad scheduler, managing the lifecycles of
- * the following components:
- *
- * 1. MyriadDriverManager
- * 2. MyriadWebServer
- * 3. TaskTerminator
- * 4. HealthCheckRegistry
- *
- * Main uses the Guice Injector framework to manage the Myriad object graph and is
- * configured by myriad-config-default.yml
- */
public static void initialize(Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext,
InterceptorRegistry registry) throws Exception {
MyriadModule myriadModule = new MyriadModule("myriad-config-default.yml", hadoopConf, yarnScheduler, rmContext, registry);
@@ -217,19 +205,19 @@ public class Main {
SchedulerState schedulerState = injector.getInstance(SchedulerState.class);
Set<org.apache.myriad.state.NodeTask> launchedNMTasks = new HashSet<>();
- launchedNMTasks.addAll(schedulerState.getPendingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
+ launchedNMTasks.addAll(schedulerState.getPendingTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX));
if (!launchedNMTasks.isEmpty()) {
LOGGER.info("{} NM(s) in pending state. Not launching additional NMs", launchedNMTasks.size());
return;
}
- launchedNMTasks.addAll(schedulerState.getStagingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
+ launchedNMTasks.addAll(schedulerState.getStagingTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX));
if (!launchedNMTasks.isEmpty()) {
LOGGER.info("{} NM(s) in staging state. Not launching additional NMs", launchedNMTasks.size());
return;
}
- launchedNMTasks.addAll(schedulerState.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
+ launchedNMTasks.addAll(schedulerState.getActiveTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX));
if (!launchedNMTasks.isEmpty()) {
LOGGER.info("{} NM(s) in active state. Not launching additional NMs", launchedNMTasks.size());
return;
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
index bb560a4..4bcb628 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
@@ -102,7 +102,7 @@ public class MyriadModule extends AbstractModule {
bind(NMHeartBeatHandler.class).asEagerSingleton();
MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
- mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
+ mapBinder.addBinding(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
index ae21c69..2050199 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
@@ -51,7 +51,7 @@ public class SchedulerStateResource {
@GET
public GetSchedulerStateResponse getState() {
return new GetSchedulerStateResponse(toStringCollection(state.getPendingTaskIds()), toStringCollection(
- state.getStagingTaskIds()), toStringCollection(state.getActiveTaskIds()), toStringCollection(state.getKillableTasks()));
+ state.getStagingTaskIds()), toStringCollection(state.getActiveTaskIds()), toStringCollection(state.getKillableTaskIds()));
}
private Collection<String> toStringCollection(Collection<Protos.TaskID> collection) {
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
index 56ea43d..79a8301 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
@@ -56,8 +56,13 @@ public class NodeManagerConfiguration {
/**
* Default NodeManager Mesos task prefix
*/
- public static final String NM_TASK_PREFIX = "nm";
-
+ public static final String DEFAULT_NM_TASK_PREFIX = "nm";
+
+ /**
+ * Default max CPU cores for NodeManager JVM
+ */
+ public static final double DEFAULT_NM_MAX_CPUS = 24;
+
/**
* Translates to -Xmx for the NodeManager JVM.
*/
@@ -85,7 +90,10 @@ public class NodeManagerConfiguration {
*/
@JsonProperty
private Boolean cgroups;
-
+
+ @JsonProperty
+ private Double maxCpus;
+
private Double generateNodeManagerMemory() {
return (NodeManagerConfiguration.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + NodeManagerConfiguration.JVM_OVERHEAD);
}
@@ -117,4 +125,8 @@ public class NodeManagerConfiguration {
public boolean getCgroups() {
return Optional.fromNullable(cgroups).or(DEFAULT_NM_CGROUPS);
}
+
+ public Double getMaxCpus() {
+ return Optional.fromNullable(maxCpus).or(DEFAULT_NM_MAX_CPUS);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
index fb1f1bb..13e83cd 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
@@ -40,6 +40,7 @@ import org.apache.myriad.webapp.MyriadWebServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
@@ -49,7 +50,6 @@ import com.google.inject.Inject;
public class MyriadOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(MyriadOperations.class);
private final SchedulerState schedulerState;
-
private MyriadConfiguration cfg;
private NodeScaleDownPolicy nodeScaleDownPolicy;
private MyriadDriverManager driverManager;
@@ -69,12 +69,17 @@ public class MyriadOperations {
myriadStateStore = (MyriadStateStore) rmContext.getStateStore();
}
}
+
+ @VisibleForTesting
+ protected SchedulerState getSchedulerState() {
+ return schedulerState;
+ }
public void flexUpCluster(ServiceResourceProfile serviceResourceProfile, int instances, Constraint constraint) {
Collection<NodeTask> nodes = new HashSet<>();
for (int i = 0; i < instances; i++) {
NodeTask nodeTask = new NodeTask(serviceResourceProfile, constraint);
- nodeTask.setTaskPrefix(NodeManagerConfiguration.NM_TASK_PREFIX);
+ nodeTask.setTaskPrefix(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX);
nodes.add(nodeTask);
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
index 3c3ce79..aa09f89 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
@@ -46,8 +46,8 @@ public class Rebalancer implements Runnable {
@Override
public void run() {
- final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
- final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
+ final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX);
+ final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX);
LOGGER.info("Active {}, Pending {}", activeIds.size(), pendingIds.size());
if (activeIds.size() < 1 && pendingIds.size() < 1) {
myriadOperations.flexUpCluster(profileManager.get("small"), 1, null);
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
index 693ae63..583be2f 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
@@ -59,7 +59,7 @@ public class SchedulerUtils {
* @return
*/
public static boolean isEligibleForFineGrainedScaling(String hostName, SchedulerState state) {
- for (NodeTask activeNMTask : state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)) {
+ for (NodeTask activeNMTask : state.getActiveTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX)) {
if (activeNMTask.getProfile().getCpus() == 0 &&
activeNMTask.getProfile().getMemory() == 0 &&
activeNMTask.getHostname().equals(hostName)) {
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
index 7e63e0d..8c806c1 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
@@ -150,7 +150,7 @@ public abstract class TaskFactory {
* Simple helper to convert Mesos Range Resource to a list of longs.
*/
protected List<Long> rangesConverter(List<Protos.Resource> rangeResources) {
- List<Long> ret = new ArrayList();
+ List<Long> ret = new ArrayList<Long>();
for (Protos.Resource range : rangeResources) {
ret.add(range.getRanges().getRange(0).getBegin());
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
index 4110b37..e695b40 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
@@ -62,12 +62,12 @@ public class TaskTerminator implements Runnable {
@Override
public void run() {
//If there are 1..n killable tasks, proceed; otherwise, simply return
- if (CollectionUtils.isNotEmpty(schedulerState.getKillableTasks())) {
+ if (CollectionUtils.isNotEmpty(schedulerState.getKillableTaskIds())) {
/*
* Clone the killable task collection, iterate through all tasks, and
* process any pending and/or non-pending tasks
*/
- Set<TaskID> killableTasks = Sets.newHashSet(schedulerState.getKillableTasks());
+ Set<TaskID> killableTasks = Sets.newHashSet(schedulerState.getKillableTaskIds());
Status driverStatus = driverManager.getDriverStatus();
//TODO (hokiegeek2) Can the DriverManager be restarted? If not, should the ResourceManager stop?
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
index 4bd60bc..b26bdaf 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
@@ -46,7 +46,11 @@ public class TaskUtils {
public double getNodeManagerMemory() {
return cfg.getNodeManagerConfiguration().getJvmMaxMemoryMB();
}
-
+
+ public double getNodeManagerMaxCpus() {
+ return cfg.getNodeManagerConfiguration().getMaxCpus();
+ }
+
public double getNodeManagerCpus() {
return cfg.getNodeManagerConfiguration().getCpus();
}
@@ -80,7 +84,7 @@ public class TaskUtils {
*/
public Iterable<Protos.Resource> getScalarResource(Protos.Offer offer, String name, Double value, Double used) {
String role = cfg.getFrameworkRole();
- List<Protos.Resource> resources = new ArrayList<>();
+ List<Protos.Resource> resources = new ArrayList<Protos.Resource>();
double resourceDifference = 0; //used to determine the resource difference of value and the resources requested from role *
//Find role by name, must loop through resources
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
index 079df4b..b787b48 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
@@ -124,6 +124,6 @@ public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent>
LOGGER.info("Removed {} task with id {}", stopReason, taskId);
}
private boolean taskIsKillable(TaskID taskId) {
- return schedulerState.getKillableTasks().contains(taskId);
+ return schedulerState.getKillableTaskIds().contains(taskId);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
index 86bbc8c..e16e8b4 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Offer;
+import org.apache.myriad.configuration.NodeManagerConfiguration;
import org.apache.myriad.scheduler.MyriadDriver;
import org.apache.myriad.scheduler.SchedulerUtils;
import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor;
@@ -56,11 +57,12 @@ public class NMHeartBeatHandler extends BaseInterceptor {
private final OfferLifecycleManager offerLifecycleMgr;
private final NodeStore nodeStore;
private final SchedulerState state;
+ private final NodeManagerConfiguration conf;
@Inject
public NMHeartBeatHandler(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, MyriadDriver myriadDriver,
YarnNodeCapacityManager yarnNodeCapacityMgr, OfferLifecycleManager offerLifecycleMgr,
- NodeStore nodeStore, SchedulerState state) {
+ NodeStore nodeStore, SchedulerState state, NodeManagerConfiguration conf) {
if (registry != null) {
registry.register(this);
@@ -72,6 +74,7 @@ public class NMHeartBeatHandler extends BaseInterceptor {
this.offerLifecycleMgr = offerLifecycleMgr;
this.nodeStore = nodeStore;
this.state = state;
+ this.conf = conf;
}
@Override
@@ -88,9 +91,11 @@ public class NMHeartBeatHandler extends BaseInterceptor {
public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
switch (event.getType()) {
case STARTED:
+ // Since the RMNode was just started, it should not have a non-zero capacity
RMNode rmNode = context.getRMNodes().get(event.getNodeId());
- Resource totalCapability = rmNode.getTotalCapability();
- if (totalCapability.getMemory() != 0 || totalCapability.getVirtualCores() != 0) {
+
+ if (isNonZeroCapacityNode(rmNode)) {
+ Resource totalCapability = rmNode.getTotalCapability();
logger.warn(
"FineGrainedScaling feature got invoked for a NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the " +
"NM's capacity to (0G,0CPU)", rmNode.getHostName(), totalCapability.getMemory(), totalCapability.getVirtualCores());
@@ -109,6 +114,12 @@ public class NMHeartBeatHandler extends BaseInterceptor {
}
@VisibleForTesting
+ protected boolean isNonZeroCapacityNode(RMNode node) {
+ Resource resource = node.getTotalCapability();
+ return (resource.getMemory() != 0 || resource.getVirtualCores() != 0);
+ }
+
+ @VisibleForTesting
protected void handleStatusUpdate(RMNodeEvent event, RMContext context) {
if (!(event instanceof RMNodeStatusEvent)) {
logger.error("{} not an instance of {}", event.getClass().getName(), RMNodeStatusEvent.class.getName());
@@ -124,25 +135,42 @@ public class NMHeartBeatHandler extends BaseInterceptor {
host.snapshotRunningContainers();
}
- // New capacity of the node =
- // resources under use on the node (due to previous offers) +
- // new resources offered by mesos for the node
- yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(getResourcesUnderUse(statusEvent), getNewResourcesOfferedByMesos(
- hostName)));
+ /*
+ * Set the new node capacity which is the sum of the current node resources plus those offered by Mesos.
+ * If the sum is greater than the max capacity of the node, reject the offer.
+ */
+ Resource offeredResources = getNewResourcesOfferedByMesos(hostName);
+ Resource currentResources = getResourcesUnderUse(statusEvent);
+
+ if (offerWithinResourceLimits(currentResources, offeredResources)) {
+ yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(currentResources, offeredResources));
+ logger.info("Updated resources for {} with {} cores and {} memory", rmNode.getNode().getName(),
+ offeredResources.getVirtualCores(), offeredResources.getMemory());
+ } else {
+ logger.info("Did not update {} with {} cores and {} memory, over max cpu cores and/or max memory",
+ rmNode.getNode().getName(), offeredResources.getVirtualCores(), offeredResources.getMemory());
+ }
}
-
- private Resource getNewResourcesOfferedByMesos(String hostname) {
+
+ @VisibleForTesting
+ protected boolean offerWithinResourceLimits(Resource currentResources, Resource offeredResources) {
+ int newMemory = currentResources.getMemory() + offeredResources.getMemory();
+ int newCores = currentResources.getVirtualCores() + offeredResources.getVirtualCores();
+
+ return (newMemory <= conf.getJvmMaxMemoryMB() && newCores <= conf.getMaxCpus());
+ }
+
+ @VisibleForTesting
+ protected Resource getNewResourcesOfferedByMesos(String hostname) {
OfferFeed feed = offerLifecycleMgr.getOfferFeed(hostname);
- if (feed == null) {
- logger.debug("No offer feed for: {}", hostname);
- return Resource.newInstance(0, 0);
- }
List<Offer> offers = new ArrayList<>();
Protos.Offer offer;
+
while ((offer = feed.poll()) != null) {
- offers.add(offer);
+ offers.add(offer);
offerLifecycleMgr.markAsConsumed(offer);
}
+
Resource fromMesosOffers = OfferUtils.getYarnResourcesFromMesosOffers(offers);
if (logger.isDebugEnabled()) {
@@ -153,10 +181,11 @@ public class NMHeartBeatHandler extends BaseInterceptor {
return fromMesosOffers;
}
- private Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
+ @VisibleForTesting
+ protected Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
Resource usedResources = Resource.newInstance(0, 0);
for (ContainerStatus status : statusEvent.getContainers()) {
- if (status.getState() == ContainerState.NEW || status.getState() == ContainerState.RUNNING) {
+ if (containerInUse(status)) {
RMContainer rmContainer = yarnScheduler.getRMContainer(status.getContainerId());
// (sdaingade) This check is needed as RMContainer information may not be populated
// immediately after a RM restart.
@@ -167,4 +196,8 @@ public class NMHeartBeatHandler extends BaseInterceptor {
}
return usedResources;
}
+
+ private boolean containerInUse(ContainerStatus status) {
+ return (status.getState() == ContainerState.NEW || status.getState() == ContainerState.RUNNING);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
index e4cec83..9698fe7 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
/**
* Manages the Mesos offers tracked by Myriad.
@@ -57,10 +58,31 @@ public class OfferLifecycleManager {
this.myriadDriver = myriadDriver;
}
- public OfferFeed getOfferFeed(String hostname) {
- return offerFeedMap.get(hostname);
+ /**
+ * Retrieves the OfferFeed for a host and, if null, creates a new OfferFeed
+ * for a host that has either been added to, or rejoined, the Mesos cluster.
+ *
+ * @param hostname
+ * @return feed
+ */
+ @VisibleForTesting
+ protected OfferFeed getOfferFeed(String hostname) {
+ OfferFeed feed = offerFeedMap.get(hostname);
+ if (feed == null) {
+ feed = new OfferFeed();
+ offerFeedMap.put(hostname, feed);
+ }
+ return feed;
}
-
+
+ protected Optional<Node> getOfferNode(String host) {
+ return Optional.fromNullable(nodeStore.getNode(host));
+ }
+
+ protected Optional<Offer> getOffer(OfferFeed feed) {
+ return Optional.fromNullable(feed.poll());
+ }
+
public void declineOffer(Protos.Offer offer) {
myriadDriver.getDriver().declineOffer(offer.getId());
LOGGER.debug("Declined offer {}", offer.getId());
@@ -69,16 +91,12 @@ public class OfferLifecycleManager {
public void addOffers(Protos.Offer... offers) {
for (Protos.Offer offer : offers) {
String hostname = offer.getHostname();
- Node node = nodeStore.getNode(hostname);
- if (node != null) {
- OfferFeed feed = offerFeedMap.get(hostname);
- if (feed == null) {
- feed = new OfferFeed();
- offerFeedMap.put(hostname, feed);
- }
+
+ Optional<Node> optNode = getOfferNode(hostname);
+ if (optNode.isPresent()) {
+ OfferFeed feed = getOfferFeed(hostname);
feed.add(offer);
-
- node.setSlaveId(offer.getSlaveId());
+ optNode.get().setSlaveId(offer.getSlaveId());
LOGGER.debug("addResourceOffers: caching offer for host {}, offer id {}", hostname, offer.getId().getValue());
} else {
@@ -98,6 +116,17 @@ public class OfferLifecycleManager {
consumedOffer.add(offer);
}
+ @VisibleForTesting
+ protected ConsumedOffer getConsumedOffer(String hostname) {
+ ConsumedOffer cOffer = consumedOfferMap.get(hostname);
+ if (cOffer == null) {
+ cOffer = new ConsumedOffer();
+ consumedOfferMap.put(hostname, cOffer);
+ }
+
+ return cOffer;
+ }
+
public ConsumedOffer drainConsumedOffer(String hostname) {
return consumedOfferMap.remove(hostname);
}
@@ -105,18 +134,14 @@ public class OfferLifecycleManager {
public void declineOutstandingOffers(String hostname) {
int numOutStandingOffers = 0;
OfferFeed offerFeed = getOfferFeed(hostname);
- Offer offer;
- while (offerFeed != null && (offer = offerFeed.poll()) != null) {
- declineOffer(offer);
+ Optional<Offer> optOffer;
+
+ while ((optOffer = getOffer(offerFeed)).isPresent()) {
+ declineOffer(optOffer.get());
numOutStandingOffers++;
}
if (numOutStandingOffers > 0) {
LOGGER.info("Declined {} outstanding offers for host {}", numOutStandingOffers, hostname);
}
}
-
- @VisibleForTesting
- public ConsumedOffer getConsumedOffer(String hostname) {
- return consumedOfferMap.get(hostname);
- }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 8f7c6f5..52cdfeb 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -18,14 +18,12 @@
*/
package org.apache.myriad.scheduler.fgs;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+
import javax.inject.Inject;
import org.apache.hadoop.yarn.api.records.Container;
@@ -59,6 +57,10 @@ import org.apache.myriad.state.SchedulerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
/**
* Manages the capacity exposed by NodeManager. It uses the offers available
* from Mesos to inflate the node capacity and lets ResourceManager make the
@@ -316,7 +318,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
// as this is now cached in the NodeTask object in scheduler state.
Protos.ExecutorInfo executorInfo = node.getExecInfo();
if (executorInfo == null) {
- executorInfo = Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), NodeManagerConfiguration.NM_TASK_PREFIX)
+ executorInfo = Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX)
.getExecutorInfo()).setFrameworkId(offer.getFrameworkId()).build();
node.setExecInfo(executorInfo);
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
index 5251f19..b81b881 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
@@ -101,5 +102,9 @@ public class MyriadFairScheduler extends FairScheduler {
super.handle(event);
this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event);
}
+
+ public void addNode(FSSchedulerNode node) {
+ this.nodes.put(node.getNodeID(), node);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
index 8b5fb51..7ee2b62 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
@@ -169,10 +169,10 @@ public class SchedulerState {
* Return a list of TaskIDs corresponding to all killable tasks
* @return
*/
- public synchronized Set<Protos.TaskID> getKillableTasks() {
+ public synchronized Set<Protos.TaskID> getKillableTaskIds() {
Set<Protos.TaskID> returnSet = new HashSet<>();
for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) {
- returnSet.addAll(entry.getValue().getKillableTasks());
+ returnSet.addAll(entry.getValue().getKillableTaskIds());
}
return returnSet;
}
@@ -183,9 +183,9 @@ public class SchedulerState {
* @param taskPrefix
* @return
*/
- public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) {
+ public synchronized Set<Protos.TaskID> getKillableTaskIds(String taskPrefix) {
SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
- return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTasks());
+ return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTaskIds());
}
public synchronized void removeTask(Protos.TaskID taskId) {
@@ -384,7 +384,7 @@ public class SchedulerState {
try {
StoreContext sc = new StoreContext(frameworkId, tasks, getPendingTaskIds(), getStagingTaskIds(), getActiveTaskIds(),
- getLostTaskIds(), getKillableTasks());
+ getLostTaskIds(), getKillableTaskIds());
stateStore.storeMyriadState(sc);
} catch (Exception e) {
LOGGER.error("Failed to update scheduler state to state store", e);
@@ -411,7 +411,7 @@ public class SchedulerState {
LOGGER.debug("State Store state includes frameworkId: {}, pending tasks count: {}, staging tasks count: {} " +
"active tasks count: {}, lost tasks count: {}, and killable tasks count: {}", frameworkId.getValue(),
this.getPendingTaskIds().size(), this.getStagingTaskIds().size(), this.getActiveTaskIds().size(),
- this.getLostTaskIds().size(), this.getKillableTasks().size());
+ this.getLostTaskIds().size(), this.getKillableTaskIds().size());
}
} catch (Exception e) {
LOGGER.error("Failed to read scheduler state from state store", e);
@@ -545,7 +545,7 @@ public class SchedulerState {
return Collections.unmodifiableSet(this.lostTasks);
}
- public synchronized Set<Protos.TaskID> getKillableTasks() {
+ public synchronized Set<Protos.TaskID> getKillableTaskIds() {
return Collections.unmodifiableSet(this.killableTasks);
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java b/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java
index a0a9ed1..df3e35b 100644
--- a/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java
@@ -6,7 +6,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.myriad.state.MockDispatcher;
+import org.apache.myriad.TestObjectFactory;
import org.apache.myriad.state.MockRMApp;
import org.junit.Test;
@@ -17,13 +17,16 @@ public class MyriadFileSystemRMStateStoreTest {
@Test
public void testInit() throws Exception {
- Configuration conf = getConfiguration();
+ Configuration conf = new Configuration();
+ conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///" + "/tmp/myriad-file-system-rm-state-store-test");
MyriadFileSystemRMStateStore store = new MyriadFileSystemRMStateStore();
assertTrue(store.isInState(STATE.NOTINITED));
store.init(conf);
assertTrue(store.isInState(STATE.INITED));
- store.startInternal();
+ store.start();
+ assertTrue(store.isInState(STATE.STARTED));
store.close();
+ assertTrue(store.isInState(STATE.STOPPED));
}
@Test
@@ -51,19 +54,8 @@ public class MyriadFileSystemRMStateStoreTest {
}
private MyriadFileSystemRMStateStore getInitializedStore() throws Exception {
- Configuration conf = getConfiguration();
- MyriadFileSystemRMStateStore store = new MyriadFileSystemRMStateStore();
- store.init(conf);
- store.startInternal();
- store.loadState();
+ MyriadFileSystemRMStateStore store = TestObjectFactory.getStateStore(new Configuration(), "/tmp/myriad-file-system-rm-state-store-test");
store.loadMyriadState();
- store.setRMDispatcher(new MockDispatcher());
return store;
}
-
- private Configuration getConfiguration() {
- Configuration conf = new Configuration();
- conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/");
- return conf;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java
index e5c3f57..13b97de 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java
@@ -1,6 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.myriad;
+import java.io.File;
+import java.net.URL;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.myriad.configuration.MyriadConfiguration;
+import org.junit.After;
import org.junit.Before;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -15,15 +38,63 @@ public class BaseConfigurableTest {
protected MyriadConfiguration cfg;
protected MyriadConfiguration cfgWithRole;
protected MyriadConfiguration cfgWithDocker;
+ protected String baseStateStoreDirectory = StringUtils.EMPTY;
+ /**
+ * This is normally overridden in derived classes. Be sure to invoke this implementation;
+ * otherwise, cfg, cfgWithRole, and cfgWithDocker will all be null.
+ *
+ * @throws Exception
+ */
@Before
public void setUp() throws Exception {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- cfg = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default.yml"),
+ cfg = mapper.readValue(getConfURL("myriad-config-test-default.yml"),
MyriadConfiguration.class);
- cfgWithRole = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default-with-framework-role.yml"),
+ cfgWithRole = mapper.readValue(getConfURL("myriad-config-test-default-with-framework-role.yml"),
MyriadConfiguration.class);
- cfgWithDocker = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default-with-docker-info.yml"),
+ cfgWithDocker = mapper.readValue(getConfURL("myriad-config-test-default-with-docker-info.yml"),
MyriadConfiguration.class);
}
+
+ /**
+ * Deletes the directories and files that back the MyriadFileSystemRMStateStore to ensure there
+ * is no stale state within the MyriadFileSystemRMStateStore that could result in race conditions
+ * depending upon how the unit tests are executed.
+ *
+ * @throws Exception
+ */
+ protected void resetStoreState() throws Exception {
+ checkConfiguration();
+ File rootFile = new File(baseStateStoreDirectory + "/FSRMStateRoot/RMMyriadRoot");
+ //Delete directory if present and recursively create directory path
+ FileUtils.deleteDirectory(rootFile);
+ FileUtils.forceMkdir(rootFile);
+
+ File storeFile = new File(rootFile.getAbsolutePath() + "/MyriadState");
+
+ if (!storeFile.createNewFile()) {
+ throw new IllegalStateException(rootFile.getAbsolutePath() + "/MyriadState could not be created");
+ }
+ }
+
+ /**
+ * Confirms the configuration of object graph is correct
+ *
+ * @throws IllegalStateException
+ */
+ protected void checkConfiguration() throws IllegalStateException {
+ if (StringUtils.isEmpty(baseStateStoreDirectory)) {
+ throw new IllegalStateException("The baseStateStoreDirectory must be set, preferably in overridden setUp method");
+ }
+ }
+
+ private URL getConfURL(String file) {
+ return Thread.currentThread().getContextClassLoader().getResource(file);
+ }
+
+ @After
+ public void cleanUp() throws Exception {
+ FileUtils.deleteDirectory(new File(baseStateStoreDirectory));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java b/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
index dc7a00b..4652f1c 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
@@ -18,15 +18,27 @@
package org.apache.myriad;
-import com.fasterxml.jackson.databind.*;
-import com.fasterxml.jackson.dataformat.yaml.*;
-import com.google.inject.*;
-import com.google.inject.multibindings.*;
-import java.io.*;
-import java.util.*;
-import org.apache.myriad.configuration.*;
-import org.apache.myriad.scheduler.*;
-import org.slf4j.*;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.NodeManagerConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+import org.apache.myriad.scheduler.ExecutorCommandLineGenerator;
+import org.apache.myriad.scheduler.NMExecutorCommandLineGenerator;
+import org.apache.myriad.scheduler.NMTaskFactory;
+import org.apache.myriad.scheduler.ServiceTaskFactory;
+import org.apache.myriad.scheduler.TaskFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.MapBinder;
/**
* AbstractModule extension for UnitTests
@@ -57,7 +69,7 @@ public class MyriadTestModule extends AbstractModule {
bind(MyriadConfiguration.class).toInstance(cfg);
MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
- mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
+ mapBinder.addBinding(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull();
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java b/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java
index 9117e3b..43014fa 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java
@@ -1,40 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.myriad;
+import java.util.HashMap;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeHealthStatusPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MyriadFileSystemRMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
import org.apache.mesos.Protos.FrameworkID;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.Value.Type;
+import org.apache.mesos.SchedulerDriver;
import org.apache.myriad.configuration.MyriadConfiguration;
-import org.apache.myriad.policy.LeastAMNodesFirstPolicy;
+import org.apache.myriad.scheduler.ExtendedResourceProfile;
import org.apache.myriad.scheduler.MockSchedulerDriver;
import org.apache.myriad.scheduler.MyriadDriver;
import org.apache.myriad.scheduler.MyriadDriverManager;
-import org.apache.myriad.scheduler.MyriadOperations;
+import org.apache.myriad.scheduler.NMProfile;
+import org.apache.myriad.scheduler.ServiceResourceProfile;
+import org.apache.myriad.scheduler.constraints.LikeConstraint;
import org.apache.myriad.scheduler.yarn.MyriadCapacityScheduler;
+import org.apache.myriad.scheduler.yarn.MyriadFairScheduler;
import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
import org.apache.myriad.state.MockDispatcher;
import org.apache.myriad.state.MockRMContext;
-import org.apache.myriad.state.MockRMNode;
+import org.apache.myriad.state.MyriadStateStore;
+import org.apache.myriad.state.NodeTask;
import org.apache.myriad.state.SchedulerState;
import org.apache.myriad.webapp.HttpConnectorProvider;
import org.apache.myriad.webapp.MyriadWebServer;
@@ -44,28 +81,165 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHandler;
import org.mortbay.jetty.servlet.ServletHolder;
+import com.google.common.collect.Lists;
import com.google.inject.servlet.GuiceFilter;
/**
- * Factory for common standard and mock objects utilized for JUnit tests
+ * Factory for common objects utilized over 1..n JUnit tests
*/
public class TestObjectFactory {
- public static SchedulerState getSchedulerState(MyriadConfiguration cfg) throws Exception {
- Configuration conf = new Configuration();
- SchedulerState state = new SchedulerState(TestObjectFactory.getStateStore(conf, false));
+
+ /**
+ * Returns a new RMContainer corresponding to the RMNode and RMContext. The RMContainer is the
+ * ResourceManager's view of an application container per the Hadoop docs
+ *
+ * @param node
+ * @param context
+ * @param appId
+ * @param cores
+ * @param memory
+ * @return RMContainer
+ */
+ public static RMContainer getRMContainer(RMNode node, RMContext context, int appId, int cores, int memory) {
+ ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(123456789, 1), 1), appId);
+
+ Container container = Container.newInstance(containerId, node.getNodeID(), node.getHttpAddress(),
+ Resources.createResource(memory, cores), null, null);
+ return new RMContainerImpl(container, containerId.getApplicationAttemptId(), node.getNodeID(), "user1", context);
+ }
+
+ public static RMNodeStatusEvent getRMStatusEvent(RMNode node) {
+ NodeId id = node.getNodeID();
+ NodeHealthStatus hStatus = NodeHealthStatusPBImpl.newInstance(true, "HEALTHY", System.currentTimeMillis());
+ List<ContainerStatus> cStatus = Lists.newArrayList(getContainerStatus(node));
+ List<ApplicationId> keepAliveIds = Lists.newArrayList(getApplicationId(node.getHttpPort()));
+ NodeHeartbeatResponse response = new NodeHeartbeatResponsePBImpl();
+
+ return new RMNodeStatusEvent(id, hStatus, cStatus, keepAliveIds, response);
+ }
+
+ private static ContainerStatus getContainerStatus(RMNode node) {
+ ContainerStatus status = new ContainerStatusPBImpl();
+ return status;
+ }
+
+ private static ApplicationId getApplicationId(int id) {
+ return ApplicationId.newInstance(System.currentTimeMillis(), id);
+ }
+ /**
+ * Returns a ServiceResourceProfile or ExtendedResourceProfile object depending upon
+ * whether the execCores and execMemory parameters are null or non-null, respectively
+ *
+ * @param profileName
+ * @param cores
+ * @param memory
+ * @param execCores
+ * @param execMemory
+ * @return ServiceResourceProfile if execCores and execMemory are null, ExtendedResourceProfile otherwise
+ */
+ public static ServiceResourceProfile getServiceResourceProfile(String profileName, Double cores, Double memory,
+ Long execCores, Long execMemory) {
+ if (isExtendedResource(execCores, execMemory)) {
+ NMProfile nmProfile = new NMProfile(profileName, execCores, execMemory);
+ return new ExtendedResourceProfile(nmProfile, cores, memory, new HashMap<String, Long>());
+ }
+ return new ServiceResourceProfile(profileName, cores, memory, new HashMap<String, Long>());
+ }
+
+ private static boolean isExtendedResource(Long execCores, Long execMemory) {
+ return execCores != null && execMemory != null;
+ }
+
+ /**
+ * Returns a NodeTask with either a ServiceResourceProfile or an ExtendedResourceProfile,
+ * depending upon whether execCores and execMemory are null or non-null, respectively
+ *
+ * @param profileName
+ * @param hostName
+ * @param cores
+ * @param memory
+ * @param execCores
+ * @param execMemory
+ * @return NodeTask
+ */
+ public static NodeTask getNodeTask(String profileName, String hostName, Double cores, Double memory,
+ Long execCores, Long execMemory) {
+ NodeTask task = new NodeTask(getServiceResourceProfile(profileName, cores, memory, execCores, execMemory),
+ new LikeConstraint(hostName, "host-[0-9]*.example.com"));
+ task.setHostname(hostName);
+ task.setTaskPrefix("nm");
+ task.setSlaveId(SlaveID.newBuilder().setValue(profileName + "-" + hostName).build());
+ task.setExecutorInfo(ExecutorInfo.newBuilder().setExecutorId(ExecutorID.newBuilder().setValue("exec")).
+ setCommand(org.apache.mesos.Protos.CommandInfo.newBuilder().setValue("command")).build());
+ return task;
+ }
+
+ /**
+ * Returns a NodeTask given a ServiceResourceProfile and hostname
+ *
+ * @param hostName
+ * @param profile
+ * @return
+ */
+ public static NodeTask getNodeTask(String hostName, ServiceResourceProfile profile) {
+ NodeTask task = new NodeTask(profile, new LikeConstraint(hostName, "host-[0-9]*.example.com"));
+ task.setHostname(hostName);
+ task.setTaskPrefix("nm");
+ task.setSlaveId(SlaveID.newBuilder().setValue(profile.getName() + "-" + hostName).build());
+ task.setExecutorInfo(ExecutorInfo.newBuilder().setExecutorId(ExecutorID.newBuilder().setValue("exec")).
+ setCommand(org.apache.mesos.Protos.CommandInfo.newBuilder().setValue("command")).build());
+ return task;
+ }
+
+ public static RMNode getRMNode(String host, int port, Resource resource) {
+ NodeId id = NodeId.newInstance(host, port);
+ RMContext context = new MockRMContext();
+ return new RMNodeImpl(id, context, id.getHost(), id.getPort(), id.getPort(), new NodeBase(host, "/tmp"), resource, "version-one");
+ }
+
+ public static RMNode getRMNode(String host, int port, int memory, int cores) {
+ Resource resource = Resource.newInstance(memory, cores);
+ return getRMNode(host, port, resource);
+ }
+
+ public static Dispatcher getMockDispatcher() {
+ return new MockDispatcher();
+ }
+
+ public static SchedulerState getSchedulerState(MyriadConfiguration cfg, String baseDir) throws Exception {
+ MyriadStateStore store = TestObjectFactory.getStateStore(new Configuration(), baseDir);
+ SchedulerState state = new SchedulerState(store);
state.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build());
return state;
}
- public static FileSystemRMStateStore getRMStateStore(Configuration conf) throws Exception {
- FileSystemRMStateStore store = new MyriadFileSystemRMStateStore();
- conf.set("yarn.resourcemanager.fs.state-store.uri", "/tmp");
- store.initInternal(conf);
- return store;
+ public static MyriadFairScheduler getMyriadFairScheduler(RMContext context) {
+ MyriadFairScheduler scheduler = new MyriadFairScheduler();
+ scheduler.setRMContext(context);
+ return scheduler;
}
-
+
+ public static SchedulerNode getSchedulerNode(String host, int port, int cores, int memory) {
+ RMNode node = TestObjectFactory.getRMNode(host, port, cores, memory);
+ return new FiCaSchedulerNode(node, false);
+ }
+
+ public static MyriadFairScheduler getYarnFairScheduler() {
+ RMContext context = new MockRMContext();
+ return getMyriadFairScheduler(context);
+ }
+
+ public static MyriadDriverManager getMyriadDriverManager(MyriadDriver driver) {
+ return new MyriadDriverManager(driver);
+ }
+
public static MyriadDriverManager getMyriadDriverManager() {
- return new MyriadDriverManager(new MyriadDriver(new MockSchedulerDriver()));
+ return getMyriadDriverManager(new MyriadDriver(new MockSchedulerDriver()));
+ }
+
+ public static MyriadDriver getMyriadDriver(SchedulerDriver driver) {
+ return new MyriadDriver(driver);
}
public static InterceptorRegistry getInterceptorRegistry() {
@@ -77,7 +251,7 @@ public class TestObjectFactory {
return scheduler;
}
- private static Server getJettyServer() {
+ public static Server getJettyServer() {
Server server = new Server();
ServletHandler context = new ServletHandler();
ServletHolder holder = new ServletHolder(DefaultServlet.class);
@@ -97,76 +271,24 @@ public class TestObjectFactory {
return new MyriadWebServer(server, connector, new GuiceFilter());
}
- public static MyriadFileSystemRMStateStore getStateStore(Configuration conf, boolean loadState) throws Exception {
- conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/");
+ public static MyriadFileSystemRMStateStore getStateStore(Configuration conf, String baseDir) throws Exception {
+ conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///" + baseDir);
MyriadFileSystemRMStateStore store = new MyriadFileSystemRMStateStore();
store.init(conf);
store.start();
- if (loadState) {
- store.loadState();
- }
+ store.loadState();
store.setRMDispatcher(new MockDispatcher());
return store;
}
-
- public static Offer getOffer(String host, String slaveId, String frameworkId, String offerId) {
+
+ public static Offer getOffer(String host, String slaveId, String frameworkId, String offerId, double cpuCores, double memory) {
Protos.SlaveID sid = SlaveID.newBuilder().setValue(slaveId).build();
Protos.FrameworkID fid = FrameworkID.newBuilder().setValue(frameworkId).build();
- return Protos.Offer.newBuilder().setHostname(host).setId(OfferID.newBuilder().setValue(offerId)).setSlaveId(sid).setFrameworkId(fid).build();
- }
-
- public static RMContext getRMContext(Configuration conf) throws Exception {
- conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/");
- MockRMContext context = null;
- Dispatcher dispatcher = new MockDispatcher();
-
- RMApplicationHistoryWriter rmApplicationHistoryWriter = new RMApplicationHistoryWriter();
- AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(dispatcher);
- AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(dispatcher);
- RMDelegationTokenSecretManager delegationTokenSecretManager = new RMDelegationTokenSecretManager(1, 1, 1, 1, context);
-
- context = new MockRMContext();
- context.setStateStore(TestObjectFactory.getStateStore(conf, false));
- context.setAmLivelinessMonitor(amLivelinessMonitor);
- context.setAmFinishingMonitor(amFinishingMonitor);
- context.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
- context.setRMDelegationTokenSecretManager(delegationTokenSecretManager);
- return context;
- }
-
- public static MyriadOperations getMyriadOperations(MyriadConfiguration cfg) throws Exception {
- AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler = TestObjectFactory.getYarnScheduler();
- SchedulerState sState = TestObjectFactory.getSchedulerState(cfg);
- sState.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build());
-
- MyriadDriverManager manager = TestObjectFactory.getMyriadDriverManager();
- MyriadWebServer webServer = TestObjectFactory.getMyriadWebServer(cfg);
- CompositeInterceptor registry = new CompositeInterceptor();
- LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, scheduler, sState);
- return new MyriadOperations(cfg, sState, policy, manager, webServer, TestObjectFactory.getRMContext(new Configuration()));
- }
-
- public static SchedulerNode getSchedulerNode(NodeId nodeId, int vCores, int memory) {
- RMNode node = getMockRMNode(nodeId, vCores, memory);
- return new FiCaSchedulerNode(node, true);
- }
-
- public static RMNode getMockRMNode(NodeId nodeId, int vCores, int memory) {
- MockRMNode node = new MockRMNode(nodeId, NodeState.NEW, new NodeBase("/tmp"));
- node.setCommandPort(8041);
- node.setHostName("0.0.0.0");
- node.setHttpPort(8042);
- node.setRackName("r01n07");
- node.setHttpAddress("localhost:8042");
- node.setTotalCapability(getResource(vCores, memory));
-
- return node;
- }
-
- public static Resource getResource(int vCores, int memory) {
- Resource resource = new ResourcePBImpl();
- resource.setVirtualCores(vCores);
- resource.setMemory(memory);
- return resource;
+ Protos.Value.Scalar cores = Protos.Value.Scalar.newBuilder().setValue(cpuCores).build();
+ Protos.Value.Scalar mem = Protos.Value.Scalar.newBuilder().setValue(memory).build();
+ Protos.Resource cpuResource = Protos.Resource.newBuilder().setName("cpus").setScalar(cores).setType(Type.SCALAR).build();
+ Protos.Resource memResource = Protos.Resource.newBuilder().setName("mem").setScalar(mem).setType(Type.SCALAR).build();
+ return Protos.Offer.newBuilder().setHostname(host).setId(OfferID.newBuilder().setValue(offerId)).
+ setSlaveId(sid).setFrameworkId(fid).addResources(cpuResource).addResources(memResource).build();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java
index 5d7bb75..b30e2a7 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.myriad.api;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
index e0eda0f..7965f02 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.myriad.api;
import static org.junit.Assert.assertNotNull;
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java
index 562d128..9aef9a9 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java
@@ -89,8 +89,9 @@ public class MyriadConfigurationTest extends BaseConfigurableTest {
NodeManagerConfiguration config = cfg.getNodeManagerConfiguration();
assertFalse(config.getCgroups());
- assertEquals(new Double(0.2), config.getCpus());
- assertEquals(new Double(1024.0), config.getJvmMaxMemoryMB());
+ assertEquals(new Double(0.8), config.getCpus());
+ assertEquals(new Double(2048.0), config.getJvmMaxMemoryMB());
+ assertEquals(new Double(4.0), config.getMaxCpus());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java
index e403f90..4adc6e5 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.myriad.health;
import java.net.ServerSocket;
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java
index cebf2c7..7d1a786 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.myriad.health;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
index 29087e7..cd4683a 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
@@ -19,10 +19,17 @@ package org.apache.myriad.scheduler;
import static org.junit.Assert.assertEquals;
+import java.util.HashMap;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.mesos.Protos.FrameworkID;
import org.apache.myriad.BaseConfigurableTest;
import org.apache.myriad.TestObjectFactory;
@@ -31,95 +38,99 @@ import org.apache.myriad.policy.LeastAMNodesFirstPolicy;
import org.apache.myriad.scheduler.constraints.Constraint;
import org.apache.myriad.scheduler.constraints.LikeConstraint;
import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
+import org.apache.myriad.state.MockDispatcher;
+import org.apache.myriad.state.MockRMContext;
import org.apache.myriad.state.SchedulerState;
import org.apache.myriad.webapp.MyriadWebServer;
import org.junit.Before;
import org.junit.Test;
-import java.util.TreeMap;
-
/**
* Unit tests for MyriadOperations class
*/
public class MyriadOperationsTest extends BaseConfigurableTest {
ServiceResourceProfile small;
Constraint constraint = new LikeConstraint("localhost", "host-[0-9]*.example.com");
- MyriadWebServer webServer;
- private SchedulerState getSchedulerState() throws Exception {
- SchedulerState state = TestObjectFactory.getSchedulerState(this.cfg);
- state.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build());
- return state;
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.baseStateStoreDirectory = "/tmp/myriad-operations-test";
+ generateProfiles();
}
-
- private MyriadOperations getMyriadOperations(SchedulerState state) throws Exception {
- MyriadDriverManager manager = TestObjectFactory.getMyriadDriverManager();
+ private MyriadOperations initialize() throws Exception {
+ resetStoreState();
+ SchedulerState sState = TestObjectFactory.getSchedulerState(cfg, "tmp/myriad-operations-test");
+ sState.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build());
AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler = TestObjectFactory.getYarnScheduler();
+ MyriadDriverManager manager = TestObjectFactory.getMyriadDriverManager();
+ MyriadWebServer webServer = TestObjectFactory.getMyriadWebServer(cfg);
CompositeInterceptor registry = new CompositeInterceptor();
- LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, scheduler, state);
+ LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, scheduler, sState);
+
manager.startDriver();
- return new MyriadOperations(cfg, state, policy, manager, webServer, TestObjectFactory.getRMContext(new Configuration()));
+ return new MyriadOperations(cfg, sState, policy, manager, webServer, generateRMContext(scheduler));
}
-
- @Before
- public void setUp() throws Exception {
- super.setUp();
- webServer = TestObjectFactory.getMyriadWebServer(cfg);
- generateProfiles();
+ private void generateProfiles() {
+ small = new ServiceResourceProfile("small", new Double(0.1), new Double(512.0), new HashMap<String, Long>());
}
- private void generateProfiles() {
- TreeMap<String, Long> ports = new TreeMap<>();
- small = new ServiceResourceProfile("small", 0.1, 512.0, ports);
+ private RMContext generateRMContext(AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler) throws Exception {
+ Configuration conf = new Configuration();
+ MockRMContext context = null;
+ Dispatcher dispatcher = new MockDispatcher();
+
+ RMApplicationHistoryWriter rmApplicationHistoryWriter = new RMApplicationHistoryWriter();
+ AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(dispatcher);
+ AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(dispatcher);
+ RMDelegationTokenSecretManager delegationTokenSecretManager = new RMDelegationTokenSecretManager(1, 1, 1, 1, context);
+
+ context = new MockRMContext();
+ context.setStateStore(TestObjectFactory.getStateStore(conf, "tmp/myriad-operations-test"));
+ context.setAmLivelinessMonitor(amLivelinessMonitor);
+ context.setAmFinishingMonitor(amFinishingMonitor);
+ context.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+ context.setRMDelegationTokenSecretManager(delegationTokenSecretManager);
+ return context;
}
@Test
public void testFlexUpAndFlexDownCluster() throws Exception {
- SchedulerState sState = this.getSchedulerState();
- MyriadOperations ops = this.getMyriadOperations(sState);
- assertEquals(0, sState.getPendingTaskIds().size());
+ MyriadOperations ops = initialize();
+ assertEquals(0, ops.getSchedulerState().getPendingTaskIds().size());
ops.flexUpCluster(small, 1, constraint);
- assertEquals(1, sState.getPendingTaskIds().size());
+ assertEquals(1, ops.getSchedulerState().getPendingTaskIds().size());
ops.flexDownCluster(small, constraint, 1);
- assertEquals(0, sState.getPendingTaskIds().size());
+ assertEquals(0, ops.getSchedulerState().getPendingTaskIds().size());
}
-
+
@Test
public void testFlexUpAndFlexDownService() throws Exception {
- SchedulerState sState = this.getSchedulerState();
- MyriadOperations ops = this.getMyriadOperations(sState);
+ MyriadOperations ops = initialize();
ops.flexUpAService(1, "jobhistory");
- assertEquals(1, sState.getPendingTasksByType("jobhistory").size());
+ assertEquals(1, ops.getSchedulerState().getPendingTasksByType("jobhistory").size());
ops.flexDownAService(1, "jobhistory");
- assertEquals(0, sState.getPendingTasksByType("jobhistory").size());
+ assertEquals(0, ops.getSchedulerState().getPendingTasksByType("jobhistory").size());
}
@Test(expected = MyriadBadConfigurationException.class)
public void testFlexUpAServiceOverMaxInstances() throws Exception {
- SchedulerState sState = this.getSchedulerState();
- MyriadOperations ops = this.getMyriadOperations(sState);
- /*
- * There is 1 jobhhistory task loaded from configuration file, so flexing up
- * by two should result in MyriadBadConfigurationException
- */
- ops.flexUpAService(2, "jobhistory");
+ MyriadOperations ops = initialize();
+ ops.flexUpAService(3, "jobhistory");
}
@Test
public void testGetFlexibleInstances() throws Exception {
- SchedulerState sState = this.getSchedulerState();
- MyriadOperations ops = this.getMyriadOperations(sState);
- assertEquals(0, ops.getFlexibleInstances("jobhistory").intValue());
+ MyriadOperations ops = initialize();
ops.flexUpAService(1, "jobhistory");
assertEquals(1, ops.getFlexibleInstances("jobhistory").intValue());
}
@Test
public void testShutdownCluster() throws Exception {
- SchedulerState sState = this.getSchedulerState();
- MyriadOperations ops = this.getMyriadOperations(sState);
+ MyriadOperations ops = initialize();
ops.shutdownFramework();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
deleted file mode 100644
index 6555f04..0000000
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler
-
-import org.apache.mesos.Protos
-import org.apache.myriad.configuration.NodeManagerConfiguration
-import org.apache.myriad.state.NodeTask
-import org.apache.myriad.state.SchedulerState
-import spock.lang.Specification
-
-/**
- *
- * @author kensipe
- */
-class SchedulerUtilsSpec extends Specification {
-
- def "is unique host name"() {
- given:
- def offer = Mock(Protos.OfferOrBuilder)
- offer.getHostname() >> "hostname"
-
- expect:
- returnValue == SchedulerUtils.isUniqueHostname(offer, launchTask, tasks)
-
- where:
- tasks | launchTask | returnValue
- [] | null | true
- null | null | true
- createNodeTaskList("hostname") | createNodeTask("hostname") | false
- createNodeTaskList("missinghost") | createNodeTask("hostname") | true
- createNodeTaskList("missinghost1", "missinghost2") | createNodeTask("missinghost3") | true
- createNodeTaskList("missinghost1", "hostname") | createNodeTask("hostname") | false
-
- }
-
- def "is eligible for Fine Grained Scaling"() {
- given:
- def state = Mock(SchedulerState)
- def tasks = []
- def fgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("zero", 0, 0), 1.0, 2.0, new HashMap<String, Long>()), null)
- def cgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("low", 2, 4096), 1.0, 2.0, new HashMap<String, Long>()), null)
- fgsNMTask.setHostname("test_fgs_hostname")
- cgsNMTask.setHostname("test_cgs_hostname")
- tasks << fgsNMTask << cgsNMTask
- state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX) >> tasks
-
- expect:
- returnValue == SchedulerUtils.isEligibleForFineGrainedScaling(hostName, state)
-
- where:
- hostName | returnValue
- "test_fgs_hostname" | true
- "test_cgs_hostname" | false
- "blah" | false
- "" | false
- null | false
- }
-
- ArrayList<NodeTask> createNodeTaskList(String... hostnames) {
- def list = []
- hostnames.each { hostname ->
- list << createNodeTask(hostname)
- }
- return list
- }
-
-
- NodeTask createNodeTask(String hostname) {
- def node = new NodeTask(new ExtendedResourceProfile(new NMProfile("", 1, 1), 1.0, 1.0, new HashMap<String, Long>()), null)
- node.hostname = hostname
- node.taskPrefix = "nm"
- node
- }
-}