You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@myriad.apache.org by da...@apache.org on 2016/08/12 13:33:00 UTC
[3/3] incubator-myriad git commit: Refactor which addresses Myriad
213, 214,
and 136 in the process. -Refactored ExecutorCommandLineGenerator classes to
use this class (resolves Myriad-214 in the process). -Refactor TackFactory
classes as necessary to wo
Refactor which addresses Myriad 213, 214, and 136 in the process.
-Refactored ExecutorCommandLineGenerator classes to use this class (resolves Myriad-214 in the process).
-Refactor TackFactory classes as necessary to work with this. This adds a removes several methods to TaskUtils to get port Resources.
-Created a ResourceOfferContainer moving the work of checking offers and creating the resources from them out of the ResourceOfferEventHandler and TaskFactory Classes.
-Remove NMPorts and Ports classes.
JIRA:
[MYRIAD-136] https://issues.apache.org/jira/browse/MYRIAD-136
[MYRIAD-213] https://issues.apache.org/jira/browse/MYRIAD-213
[MYRIAD-214] https://issues.apache.org/jira/browse/MYRIAD-214
Pull Request:
Closes #79
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/7aea259c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/7aea259c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/7aea259c
Branch: refs/heads/master
Commit: 7aea259cf231655bdbd033f61584eec7715fc97f
Parents: df4cbc0
Author: DarinJ <da...@apache.org>
Authored: Thu Jun 9 10:08:46 2016 -0400
Committer: darinj <da...@apache.org>
Committed: Fri Aug 12 09:27:14 2016 -0400
----------------------------------------------------------------------
build.gradle | 4 +
.../myriad/executor/MyriadExecutorDefaults.java | 23 --
.../src/main/java/org/apache/myriad/Main.java | 47 ++-
.../java/org/apache/myriad/MyriadModule.java | 29 +-
.../configuration/MyriadConfiguration.java | 15 +-
.../configuration/NodeManagerConfiguration.java | 25 +-
.../configuration/ServiceConfiguration.java | 6 +-
.../scheduler/DownloadNMExecutorCLGenImpl.java | 90 ------
.../scheduler/ExecutorCommandLineGenerator.java | 148 ++++++++-
.../scheduler/ExtendedResourceProfile.java | 27 +-
.../myriad/scheduler/MyriadOperations.java | 2 +-
.../myriad/scheduler/NMExecutorCLGenImpl.java | 176 -----------
.../NMExecutorCommandLineGenerator.java | 150 +++++++++
.../org/apache/myriad/scheduler/NMPorts.java | 78 -----
.../apache/myriad/scheduler/NMTaskFactory.java | 74 +++++
.../java/org/apache/myriad/scheduler/Ports.java | 26 --
.../scheduler/ServiceCommandLineGenerator.java | 86 +++++-
.../scheduler/ServiceResourceProfile.java | 30 +-
.../scheduler/ServiceTaskConstraints.java | 49 ---
.../myriad/scheduler/ServiceTaskFactory.java | 76 +++++
.../scheduler/ServiceTaskFactoryImpl.java | 253 ---------------
.../myriad/scheduler/TaskConstraints.java | 35 ---
.../scheduler/TaskConstraintsManager.java | 48 ---
.../apache/myriad/scheduler/TaskFactory.java | 304 +++++++------------
.../org/apache/myriad/scheduler/TaskUtils.java | 86 +-----
.../handlers/ResourceOffersEventHandler.java | 175 ++---------
.../scheduler/resource/RangeResource.java | 218 +++++++++++++
.../resource/ResourceOfferContainer.java | 207 +++++++++++++
.../scheduler/resource/ScalarResource.java | 88 ++++++
.../scheduler/yarn/MyriadFairScheduler.java | 1 -
.../org/apache/myriad/MyriadTestModule.java | 14 +-
.../myriad/api/SchedulerStateResourceTest.java | 9 +-
.../myriad/scheduler/MockSchedulerDriver.java | 17 ++
.../myriad/scheduler/MyriadDriverTest.java | 17 ++
.../myriad/scheduler/MyriadOperationsTest.java | 22 +-
.../myriad/scheduler/NMProfileManagerTest.java | 17 ++
.../myriad/scheduler/SchedulerUtilsSpec.groovy | 6 +-
.../scheduler/ServiceResourceProfileTest.java | 24 +-
.../myriad/scheduler/TMSTaskFactoryImpl.java | 9 +-
.../scheduler/TaskConstraintsManagerTest.java | 32 --
.../myriad/scheduler/TestNMTaskFactory.java | 72 +++++
.../myriad/scheduler/TestRandomPorts.java | 203 -------------
.../scheduler/TestServiceCommandLine.java | 97 ++++--
.../scheduler/TestServiceTaskFactory.java | 77 +++++
.../apache/myriad/scheduler/TestTaskUtils.java | 37 +--
.../fgs/YarnNodeCapacityManagerTest.java | 4 +-
.../myriad/scheduler/offer/OfferBuilder.java | 117 +++++++
.../resource/TestResourceOfferContainer.java | 166 ++++++++++
.../org/apache/myriad/state/ClusterTest.java | 9 +-
.../org/apache/myriad/state/NodeTaskTest.java | 9 +-
.../apache/myriad/state/SchedulerStateTest.java | 5 +-
.../state/utils/ByteBufferSupportTest.java | 5 +-
...iad-config-test-default-with-docker-info.yml | 1 +
.../resources/myriad-config-test-default.yml | 32 +-
54 files changed, 1928 insertions(+), 1649 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 55e8d85..b48cfc9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -21,6 +21,10 @@ allprojects {
apply plugin: 'eclipse'
}
+tasks.withType(JavaCompile) {
+ options.compilerArgs << "-Xlint:unchecked" << "-Werror"
+}
+
buildscript {
repositories {
jcenter()
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java b/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java
index c7e4515..250c812 100644
--- a/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java
+++ b/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java
@@ -36,29 +36,6 @@ public class MyriadExecutorDefaults {
"org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor";
/**
- * YARN class to help handle LCE resources
- */
- public static final String KEY_YARN_NM_LCE_RH_CLASS = "yarn.nodemanager.linux-container-executor.resources-handler.class";
-
- public static final String VAL_YARN_NM_LCE_RH_CLASS = "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler";
-
- public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
-
- public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = "yarn.nodemanager.linux-container-executor.cgroups.mount";
-
- public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "yarn.nodemanager.linux-container-executor.cgroups.mount-path";
-
- public static final String KEY_YARN_NM_LCE_GROUP = "yarn.nodemanager.linux-container-executor.group";
-
- public static final String KEY_YARN_NM_LCE_PATH = "yarn.nodemanager.linux-container-executor.path";
-
- public static final String KEY_YARN_HOME = "yarn.home";
-
- public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores";
-
- public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb";
-
- /**
* Allot 10% more memory to account for JVM overhead.
*/
public static final double JVM_OVERHEAD = 0.1;
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
index 0615ebd..8c028f1 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
@@ -18,13 +18,17 @@
*/
package org.apache.myriad;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.health.HealthCheckRegistry;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -43,9 +47,6 @@ import org.apache.myriad.scheduler.NMProfile;
import org.apache.myriad.scheduler.Rebalancer;
import org.apache.myriad.scheduler.ServiceProfileManager;
import org.apache.myriad.scheduler.ServiceResourceProfile;
-import org.apache.myriad.scheduler.ServiceTaskConstraints;
-import org.apache.myriad.scheduler.TaskConstraintsManager;
-import org.apache.myriad.scheduler.TaskFactory;
import org.apache.myriad.scheduler.TaskTerminator;
import org.apache.myriad.scheduler.TaskUtils;
import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
@@ -55,21 +56,15 @@ import org.apache.myriad.webapp.WebAppGuiceModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.health.HealthCheckRegistry;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
/**
* Main is the bootstrap class for the Myriad scheduler, managing the lifecycles of
* the following components:
- *
+ *
* 1. MyriadDriverManager
* 2. MyriadWebServer
* 3. TaskTerminator
* 4. HealthCheckRegistry
- *
+ *
* Main uses the Guice Injector framework to manage the Myriad object graph and is
* configured by myriad-config-default.yml
*/
@@ -87,36 +82,36 @@ public class Main {
/**
* Main is the bootstrap class for the Myriad scheduler, managing the lifecycles of
* the following components:
- *
+ *
* 1. MyriadDriverManager
* 2. MyriadWebServer
* 3. TaskTerminator
* 4. HealthCheckRegistry
- *
+ *
* Main uses the Guice Injector framework to manage the Myriad object graph and is
* configured by myriad-config-default.yml
*/
public static void initialize(Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext,
- InterceptorRegistry registry) throws Exception {
+ InterceptorRegistry registry) throws Exception {
MyriadModule myriadModule = new MyriadModule("myriad-config-default.yml", hadoopConf, yarnScheduler, rmContext, registry);
MesosModule mesosModule = new MesosModule();
injector = Guice.createInjector(myriadModule, mesosModule, new WebAppGuiceModule());
new Main().run(injector.getInstance(MyriadConfiguration.class));
}
-
+
// TODO (Kannan Rajah) Hack to get injector in unit test.
public static Injector getInjector() {
return injector;
}
- /**
+ /**
* Initializes the Myriad object graph via MyriadConfiguration and starts
* the Mesos interface (MyriadDriverManager) as well as all Myriad services
- *
+ *
*@param cfg MyriadConfiguration
* @throws Exception
- */
+ */
public void run(MyriadConfiguration cfg) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Bindings: " + injector.getAllBindings());
@@ -171,8 +166,6 @@ public class Main {
private void initProfiles(Injector injector) {
LOGGER.info("Initializing Profiles");
ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
- TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class);
- taskConstraintsManager.addTaskConstraints(NodeManagerConfiguration.NM_TASK_PREFIX, new TaskFactory.NMTaskConstraints());
Map<String, Map<String, String>> profiles = injector.getInstance(MyriadConfiguration.class).getProfiles();
TaskUtils taskUtils = injector.getInstance(TaskUtils.class);
if (MapUtils.isNotEmpty(profiles)) {
@@ -181,10 +174,8 @@ public class Main {
if (MapUtils.isNotEmpty(profiles) && profileResourceMap.containsKey("cpu") && profileResourceMap.containsKey("mem")) {
Long cpu = Long.parseLong(profileResourceMap.get("cpu"));
Long mem = Long.parseLong(profileResourceMap.get("mem"));
-
- ServiceResourceProfile serviceProfile = new ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem), taskUtils.getExecutorCpus(), taskUtils.getExecutorMemory(),
- taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory());
-
+ ServiceResourceProfile serviceProfile = new ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem),
+ taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory(), taskUtils.getNodeManagerPorts());
profileManager.add(serviceProfile);
} else {
LOGGER.error("Invalid definition for profile: " + profile.getKey());
@@ -259,7 +250,6 @@ public class Main {
private void initServiceConfigurations(MyriadConfiguration cfg, Injector injector) {
LOGGER.info("Initializing initServiceConfigurations");
ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
- TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class);
Map<String, ServiceConfiguration> servicesConfigs = injector.getInstance(MyriadConfiguration.class).getServiceConfigurations();
@@ -268,9 +258,8 @@ public class Main {
ServiceConfiguration config = entry.getValue();
final Double cpu = config.getCpus();
final Double mem = config.getJvmMaxMemoryMB();
-
- profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem));
- taskConstraintsManager.addTaskConstraints(taskPrefix, new ServiceTaskConstraints(cfg, taskPrefix));
+ final Map<String, Long> ports = config.getPorts();
+ profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem, ports));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
index 8748dcb..bb560a4 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
@@ -32,22 +32,19 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.myriad.configuration.MyriadConfiguration;
-import org.apache.myriad.configuration.MyriadExecutorConfiguration;
import org.apache.myriad.configuration.NodeManagerConfiguration;
import org.apache.myriad.configuration.ServiceConfiguration;
import org.apache.myriad.policy.LeastAMNodesFirstPolicy;
import org.apache.myriad.policy.NodeScaleDownPolicy;
-import org.apache.myriad.scheduler.DownloadNMExecutorCLGenImpl;
import org.apache.myriad.scheduler.ExecutorCommandLineGenerator;
import org.apache.myriad.scheduler.MyriadDriverManager;
-import org.apache.myriad.scheduler.NMExecutorCLGenImpl;
+import org.apache.myriad.scheduler.NMExecutorCommandLineGenerator;
import org.apache.myriad.scheduler.NMTaskFactoryAnnotation;
import org.apache.myriad.scheduler.ReconcileService;
import org.apache.myriad.scheduler.ServiceProfileManager;
-import org.apache.myriad.scheduler.ServiceTaskFactoryImpl;
-import org.apache.myriad.scheduler.TaskConstraintsManager;
+import org.apache.myriad.scheduler.ServiceTaskFactory;
import org.apache.myriad.scheduler.TaskFactory;
-import org.apache.myriad.scheduler.TaskFactory.NMTaskFactoryImpl;
+import org.apache.myriad.scheduler.NMTaskFactory;
import org.apache.myriad.scheduler.fgs.NMHeartBeatHandler;
import org.apache.myriad.scheduler.fgs.NodeStore;
import org.apache.myriad.scheduler.fgs.OfferLifecycleManager;
@@ -96,17 +93,16 @@ public class MyriadModule extends AbstractModule {
bind(ReconcileService.class).in(Scopes.SINGLETON);
bind(HttpConnectorProvider.class).in(Scopes.SINGLETON);
bind(MyriadWebServer.class).in(Scopes.SINGLETON);
- bind(TaskConstraintsManager.class).in(Scopes.SINGLETON);
- // add special binding between TaskFactory and NMTaskFactoryImpl to ease up
+ // add special binding between TaskFactory and NMTaskFactory to ease up
// usage of TaskFactory
- bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactoryImpl.class);
+ bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactory.class);
bind(YarnNodeCapacityManager.class).in(Scopes.SINGLETON);
bind(NodeStore.class).in(Scopes.SINGLETON);
bind(OfferLifecycleManager.class).in(Scopes.SINGLETON);
bind(NMHeartBeatHandler.class).asEagerSingleton();
MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
- mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON);
+ mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
@@ -119,8 +115,8 @@ public class MyriadModule extends AbstractModule {
LOGGER.error("ClassNotFoundException", e);
}
} else {
- //TODO (kjyost) Confirm if this else statement and logic should still be here
- mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON);
+ //TODO (hokiegeek2) Confirm if this else statement and logic should still be here
+ mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactory.class).in(Scopes.SINGLETON);
}
}
@@ -163,14 +159,7 @@ public class MyriadModule extends AbstractModule {
@Provides
@Singleton
ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) {
- ExecutorCommandLineGenerator cliGenerator = null;
- MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
- if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
- cliGenerator = new DownloadNMExecutorCLGenImpl(cfg, myriadExecutorConfiguration.getNodeManagerUri().get());
- } else {
- cliGenerator = new NMExecutorCLGenImpl(cfg);
- }
- return cliGenerator;
+ return new NMExecutorCommandLineGenerator(cfg);
}
protected MyriadConfiguration generateMyriadConfiguration(String configFile) {
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
index fa8dca2..8a6e5f3 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
@@ -110,7 +110,12 @@ public class MyriadConfiguration {
public static final String DEFAULT_ZK_SERVERS = "localhost:2181";
public static final String DEFAULT_CGROUP_PATH = "/sys/fs/cgroup";
-
+
+ /**
+ * By default ha is turned off.
+ */
+ public static final Boolean DEFAULT_CGROUPS_ENABLED = false;
+
public static final Map<String, ServiceConfiguration> EMPTY_SERVICE_CONFIGURATION = Collections.emptyMap();
@JsonProperty
@@ -194,6 +199,8 @@ public class MyriadConfiguration {
@JsonProperty
private String cgroupPath;
+ @JsonProperty
+ private Boolean cgroupEnabled;
public MyriadConfiguration() {
}
@@ -301,4 +308,8 @@ public class MyriadConfiguration {
public String getCGroupPath() {
return Optional.fromNullable(cgroupPath).or(DEFAULT_CGROUP_PATH);
}
-}
\ No newline at end of file
+
+ public Boolean isCgroupEnabled() {
+ return Optional.fromNullable(cgroupEnabled).or(DEFAULT_CGROUPS_ENABLED);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
index e60a5d5..56ea43d 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
@@ -21,10 +21,18 @@ package org.apache.myriad.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
+import java.util.Map;
+import java.util.TreeMap;
+
/**
* YARN NodeManager Configuration
*/
public class NodeManagerConfiguration {
+
+ public static final String KEY_NM_ADDRESS = "myriad.yarn.nodemanager.address";
+ public static final String KEY_NM_LOCALIZER_ADDRESS = "myriad.yarn.nodemanager.localizer.address";
+ public static final String KEY_NM_WEBAPP_ADDRESS = "myriad.yarn.nodemanager.webapp.address";
+ public static final String KEY_NM_SHUFFLE_PORT = "myriad.mapreduce.shuffle.port";
/**
* Allot 10% more memory to account for JVM overhead.
*/
@@ -68,7 +76,10 @@ public class NodeManagerConfiguration {
*/
@JsonProperty
private String jvmOpts;
-
+
+ @JsonProperty
+ private Map<String, Long> ports;
+
/**
* Determines if cgroups are enabled for the NodeManager
*/
@@ -91,6 +102,18 @@ public class NodeManagerConfiguration {
return Optional.fromNullable(cpus).or(DEFAULT_NM_CPUS);
}
+ public synchronized Map<String, Long> getPorts() {
+ if (ports == null) {
+ //Good idea to have deterministic order
+ ports = new TreeMap<>();
+ ports.put(KEY_NM_ADDRESS, 0L);
+ ports.put(KEY_NM_WEBAPP_ADDRESS, 0L);
+ ports.put(KEY_NM_LOCALIZER_ADDRESS, 0L);
+ ports.put(KEY_NM_SHUFFLE_PORT, 0L);
+ }
+ return ports;
+ }
+
public boolean getCgroups() {
return Optional.fromNullable(cgroups).or(DEFAULT_NM_CGROUPS);
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
index 0f733a9..0b00bca 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.myriad.configuration;
import java.util.Map;
+import java.util.TreeMap;
import org.hibernate.validator.constraints.NotEmpty;
@@ -86,6 +87,7 @@ public class ServiceConfiguration {
protected Integer maxInstances;
@JsonProperty
+ @NotEmpty
protected String command;
@JsonProperty
@@ -123,8 +125,8 @@ public class ServiceConfiguration {
return envSettings;
}
- public Optional<Map<String, Long>> getPorts() {
- return Optional.fromNullable(ports);
+ public Map<String, Long> getPorts() {
+ return Optional.fromNullable(ports).or(new TreeMap<String, Long>());
}
public Optional<Integer> getMaxInstances() {
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
deleted file mode 100644
index 74deda3..0000000
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.myriad.scheduler;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import org.apache.myriad.configuration.MyriadConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation assumes NM binaries will be downloaded
- */
-public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl {
-
- private static final Logger LOGGER = LoggerFactory.
- getLogger(DownloadNMExecutorCLGenImpl.class);
-
- private final String nodeManagerUri;
-
- public DownloadNMExecutorCLGenImpl(MyriadConfiguration cfg, String nodeManagerUri) {
- super(cfg);
- this.nodeManagerUri = nodeManagerUri;
- }
-
- @Override
- public String generateCommandLine(ServiceResourceProfile profile, Ports ports) {
- StringBuilder cmdLine = new StringBuilder();
- LOGGER.info("Using remote distribution");
- generateEnvironment(profile, (NMPorts) ports);
- appendDistroExtractionCommands(cmdLine);
- appendCgroupsCmds(cmdLine);
- appendYarnHomeExport(cmdLine);
- appendUserSudo(cmdLine);
- appendEnvForNM(cmdLine);
- cmdLine.append(YARN_NM_CMD);
- return cmdLine.toString();
- }
-
- protected void appendDistroExtractionCommands(StringBuilder cmdLine) {
- /*
- TODO(darinj): Overall this is messier than I'd like. We can't let mesos untar the distribution, since
- it will change the permissions. Instead we simply download the tarball and execute tar -xvpf. We also
- pull the config from the resource manager and put them in the conf dir. This is also why we need
- frameworkSuperUser. This will be refactored after Mesos-1790 is resolved.
- */
-
- //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 1760 may not get to it.
- //Extract tarball keeping permissions, necessary to keep HADOOP_HOME/bin/container-executor suidbit set.
- appendSudo(cmdLine);
- cmdLine.append("tar -zxpf ").append(getFileName(nodeManagerUri));
- //Place the hadoop config where in the HADOOP_CONF_DIR where it will be read by the NodeManager
- //The url for the resource manager config is: http(s)://hostname:port/conf so fetcher.cpp downloads the
- //config file to conf, It's an xml file with the parameters of yarn-site.xml, core-site.xml and hdfs.xml.
- cmdLine.append(" && ");
- appendSudo(cmdLine);
- cmdLine.append(" cp conf ");
- cmdLine.append(cfg.getYarnEnvironment().get("YARN_HOME"));
- cmdLine.append("/etc/hadoop/yarn-site.xml;");
- }
-
- private static String getFileName(String uri) {
- int lastSlash = uri.lastIndexOf('/');
- if (lastSlash == -1) {
- return uri;
- } else {
- String fileName = uri.substring(lastSlash + 1);
- Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), "URI should not have a slash at the end");
- return fileName;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
index 82782f2..c2a1c68 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
@@ -19,11 +19,153 @@
package org.apache.myriad.scheduler;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.mesos.Protos;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.MyriadExecutorConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
/**
* Interface to plugin multiple implementations for executor command generation
*/
-public interface ExecutorCommandLineGenerator {
- String generateCommandLine(ServiceResourceProfile profile, Ports ports);
+public abstract class ExecutorCommandLineGenerator {
+
+ protected static final Logger LOGGER = LoggerFactory.getLogger(NMExecutorCommandLineGenerator.class);
+
+ public static final String KEY_YARN_RM_HOSTNAME = "yarn.resourcemanager.hostname";
+ public static final String KEY_YARN_HOME = "yarn.home";
+
+ protected static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:";
+
+ protected static final String PROPERTY_FORMAT = " -D%s=%s ";
+ protected static final String CMD_FORMAT = "echo \"%1$s\" && %1$s";
+
+ protected Protos.CommandInfo staticCommandInfo;
+
+ protected MyriadConfiguration myriadConfiguration;
+ protected MyriadExecutorConfiguration myriadExecutorConfiguration;
+ protected YarnConfiguration yarnConfiguration = new YarnConfiguration();
+
+ abstract Protos.CommandInfo generateCommandLine(ServiceResourceProfile profile, ServiceConfiguration serviceConfiguration, Collection<Long> ports);
+
+ protected void appendDistroExtractionCommands(StringBuilder cmdLine) {
+ /*
+ TODO(darinj): Overall this is messier than I'd like. We can't let mesos untar the distribution, since
+ it will change the permissions. Instead we simply download the tarball and execute tar -xvpf. We also
+ pull the config from the resource manager and put them in the yarnConfiguration dir. This is also why we need
+ frameworkSuperUser. This will be refactored after Mesos-1790 is resolved.
+ */
+
+ //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 1760 may not get to it.
+ if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+ //Extract tarball keeping permissions, necessary to keep HADOOP_HOME/bin/container-executor suidbit set.
+ //If SudoUser not enable LinuxExecutor will not work
+ appendSudo(cmdLine);
+ cmdLine.append("tar -zxpf ").append(getFileName(myriadExecutorConfiguration.getNodeManagerUri().get()));
+ cmdLine.append(" && ");
+ //Place the hadoop config where in the HADOOP_CONF_DIR where it will be read by the NodeManager
+ //The url for the resource manager config is: http(s)://hostname:port/yarnConfiguration so fetcher.cpp downloads the
+ //config file to yarnConfiguration, It's an xml file with the parameters of yarn-site.xml, core-site.xml and hdfs.xml.
+ if (!myriadExecutorConfiguration.getConfigUri().isPresent()) {
+ appendSudo(cmdLine);
+ cmdLine.append(" cp yarnConfiguration ");
+ cmdLine.append(myriadConfiguration.getYarnEnvironment().get("YARN_HOME"));
+ cmdLine.append("/etc/hadoop/yarn-site.xml && ");
+ }
+ }
+ }
+
+ protected void addJavaOpt(StringBuilder opts, String propertyName, String propertyValue) {
+ String envOpt = String.format(PROPERTY_FORMAT, propertyName, propertyValue);
+ opts.append(envOpt);
+ }
+
+ protected void appendSudo(StringBuilder cmdLine) {
+ if (myriadConfiguration.getFrameworkSuperUser().isPresent()) {
+ cmdLine.append(" sudo ");
+ }
+ }
+
+ protected void appendUserSudo(StringBuilder cmdLine) {
+ if (myriadConfiguration.getFrameworkSuperUser().isPresent()) {
+ cmdLine.append(" sudo -E -u ");
+ cmdLine.append(myriadConfiguration.getFrameworkUser().get());
+ cmdLine.append(" -H ");
+ }
+ }
+
+ public String getConfigurationUrl() {
+ String httpPolicy = yarnConfiguration.get(TaskFactory.YARN_HTTP_POLICY);
+ String address = StringUtils.EMPTY;
+ if (httpPolicy != null && httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) {
+ address = yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
+ if (StringUtils.isEmpty(address)) {
+ address = yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8090";
+ }
+ return "https://" + address + "/yarnConfiguration";
+ } else {
+ address = yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
+ if (StringUtils.isEmpty(address)) {
+ address = yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8088";
+ }
+ return "http://" + address + "/yarnConfiguration";
+ }
+ }
+
+ private static String getFileName(String uri) {
+ int lastSlash = uri.lastIndexOf('/');
+ if (lastSlash == -1) {
+ return uri;
+ } else {
+ String fileName = uri.substring(lastSlash + 1);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), "URI should not have a slash at the end");
+ return fileName;
+ }
+ }
+
+ protected String getUser() {
+ if (myriadConfiguration.getFrameworkSuperUser().isPresent()) {
+ return myriadConfiguration.getFrameworkSuperUser().get();
+ } else {
+ return myriadConfiguration.getFrameworkUser().get();
+ }
+ }
- String getConfigurationUrl();
+ protected List<Protos.CommandInfo.URI> getUris() {
+ List<Protos.CommandInfo.URI> uris = new ArrayList<>();
+ if (myriadExecutorConfiguration.getJvmUri().isPresent()) {
+ final String jvmRemoteUri = myriadExecutorConfiguration.getJvmUri().get();
+ LOGGER.info("Getting JRE distribution from:" + jvmRemoteUri);
+ uris.add(Protos.CommandInfo.URI.newBuilder().setValue(jvmRemoteUri).build());
+ }
+ if (myriadExecutorConfiguration.getConfigUri().isPresent()) {
+ String configURI = myriadExecutorConfiguration.getConfigUri().get();
+ LOGGER.info("Getting Hadoop configuration from: {}", configURI);
+ uris.add(Protos.CommandInfo.URI.newBuilder().setValue(configURI).build());
+ } else if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+ String configURI = getConfigurationUrl();
+ LOGGER.info("Getting Hadoop configuration from: {}", configURI);
+ uris.add(Protos.CommandInfo.URI.newBuilder().setValue(configURI).build());
+ }
+ if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+ //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct.
+ if (!(myriadConfiguration.getFrameworkUser().isPresent() && myriadConfiguration.getFrameworkSuperUser().isPresent())) {
+ LOGGER.warn("Trying to use remote distribution, but frameworkUser and/or frameworkSuperUser not set!" +
+ "Some features may not work");
+ }
+ String nodeManagerUri = myriadExecutorConfiguration.getNodeManagerUri().get();
+ LOGGER.info("Getting Hadoop distribution from: {}", nodeManagerUri);
+ uris.add(Protos.CommandInfo.URI.newBuilder().setValue(nodeManagerUri).setExtract(false).build());
+ }
+ return uris;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
index 6232258..59e2cb0 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
@@ -20,6 +20,8 @@ package org.apache.myriad.scheduler;
import com.google.gson.Gson;
+import java.util.Map;
+
/**
* Extended ServiceResourceProfile for services that need to pass set of resources downstream
* currently the only such service is NodeManager
@@ -33,33 +35,12 @@ public class ExtendedResourceProfile extends ServiceResourceProfile {
* @param cpu
* @param mem will throw NullPoiterException if childProfile is null
*/
- public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double mem, Double execCpu, Double execMemory) {
- super(childProfile.getName(), cpu, mem, execCpu, execMemory);
-
+ public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double mem, Map<String, Long> ports) {
+ super(childProfile.getName(), cpu, mem, ports);
this.childProfile = childProfile;
this.className = ExtendedResourceProfile.class.getName();
}
- /**
- * @param childProfile - should be null
- * @param cpu
- * @param mem will throw NullPoiterException if childProfile is null
- */
- public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double mem) {
- super(childProfile.getName(), cpu, mem);
-
- this.childProfile = childProfile;
- this.className = ExtendedResourceProfile.class.getName();
- }
-
- public NMProfile getChildProfile() {
- return childProfile;
- }
-
- public void setChildProfile(NMProfile nmProfile) {
- this.childProfile = nmProfile;
- }
-
@Override
public String getName() {
return childProfile.getName();
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
index 99f7e78..fb1f1bb 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
@@ -136,7 +136,7 @@ public class MyriadOperations {
Collection<NodeTask> nodes = new HashSet<>();
for (int i = 0; i < instances; i++) {
- NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, cpu, mem), null);
+ NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, cpu, mem, auxTaskConf.getPorts()), null);
nodeTask.setTaskPrefix(serviceName);
nodes.add(nodeTask);
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java
deleted file mode 100644
index db945ec..0000000
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.myriad.scheduler;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.myriad.configuration.MyriadConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation assumes NM binaries already deployed
- */
-public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(NMExecutorCLGenImpl.class);
-
- public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS";
- public static final String KEY_YARN_NM_CGROUPS_PATH = "yarn.nodemanager.cgroups.path";
- public static final String KEY_YARN_RM_HOSTNAME = "yarn.resourcemanager.hostname";
-
- /**
- * YARN class to help handle LCE resources
- */
- // TODO (mohit): Should it be configurable ?
- public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
- public static final String KEY_YARN_HOME = "yarn.home";
- public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores";
- public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb";
- public static final String YARN_NM_CMD = " $YARN_HOME/bin/yarn nodemanager";
- public static final String KEY_NM_ADDRESS = "myriad.yarn.nodemanager.address";
- public static final String KEY_NM_LOCALIZER_ADDRESS = "myriad.yarn.nodemanager.localizer.address";
- public static final String KEY_NM_WEBAPP_ADDRESS = "myriad.yarn.nodemanager.webapp.address";
- public static final String KEY_NM_SHUFFLE_PORT = "myriad.mapreduce.shuffle.port";
-
- private static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:";
- private static final String PROPERTY_FORMAT = "-D%s=%s";
-
- private Map<String, String> environment = new HashMap<>();
- protected MyriadConfiguration cfg;
- protected YarnConfiguration conf = new YarnConfiguration();
-
- public NMExecutorCLGenImpl(MyriadConfiguration cfg) {
- this.cfg = cfg;
- }
-
- @Override
- public String generateCommandLine(ServiceResourceProfile profile, Ports ports) {
- StringBuilder cmdLine = new StringBuilder();
-
- generateEnvironment(profile, (NMPorts) ports);
- appendCgroupsCmds(cmdLine);
- appendYarnHomeExport(cmdLine);
- appendEnvForNM(cmdLine);
- cmdLine.append(YARN_NM_CMD);
- return cmdLine.toString();
- }
-
- protected void generateEnvironment(ServiceResourceProfile profile, NMPorts ports) {
- //yarnEnvironemnt configuration from yaml file
- Map<String, String> yarnEnvironmentMap = cfg.getYarnEnvironment();
- if (yarnEnvironmentMap != null) {
- environment.putAll(yarnEnvironmentMap);
- }
-
- String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME);
- if (StringUtils.isNotEmpty(rmHostName)) {
- addYarnNodemanagerOpt(KEY_YARN_RM_HOSTNAME, rmHostName);
- }
-
- if (cfg.getNodeManagerConfiguration().getCgroups()) {
- addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, "mesos/$TASK_DIR");
- if (environment.containsKey("YARN_HOME")) {
- addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME"));
- }
- }
- addYarnNodemanagerOpt(KEY_NM_RESOURCE_CPU_VCORES, Integer.toString(profile.getCpus().intValue()));
- addYarnNodemanagerOpt(KEY_NM_RESOURCE_MEM_MB, Integer.toString(profile.getMemory().intValue()));
- addYarnNodemanagerOpt(KEY_NM_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getRpcPort()).toString());
- addYarnNodemanagerOpt(KEY_NM_LOCALIZER_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getLocalizerPort()).toString());
- addYarnNodemanagerOpt(KEY_NM_WEBAPP_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getWebAppHttpPort()).toString());
- addYarnNodemanagerOpt(KEY_NM_SHUFFLE_PORT, Long.valueOf(ports.getShufflePort()).toString());
- }
-
- protected void appendEnvForNM(StringBuilder cmdLine) {
- cmdLine.append(" env ");
- for (Map.Entry<String, String> env : environment.entrySet()) {
- cmdLine.append(env.getKey()).append("=").append("\"").append(env.getValue()).append("\" ");
- }
- }
-
- protected void appendCgroupsCmds(StringBuilder cmdLine) {
- if (cfg.getFrameworkSuperUser().isPresent()) {
- cmdLine.append(" export TASK_DIR=`basename $PWD`&&");
- //The container executor script expects mount-path to exist and owned by the yarn user
- //See: https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html
- //If YARN ever moves to cgroup/mem it will be necessary to add a mem version.
- appendSudo(cmdLine);
- cmdLine.append("chown " + cfg.getFrameworkUser().get() + " ");
- cmdLine.append(cfg.getCGroupPath());
- cmdLine.append("/cpu/mesos/$TASK_DIR &&");
- } else {
- LOGGER.info("frameworkSuperUser not enabled ignoring cgroup configuration");
- }
- }
-
- protected void appendYarnHomeExport(StringBuilder cmdLine) {
- if (environment.containsKey("YARN_HOME")) {
- cmdLine.append(" export YARN_HOME=");
- cmdLine.append(environment.get("YARN_HOME"));
- cmdLine.append(";");
- }
- }
-
- protected void appendSudo(StringBuilder cmdLine) {
- if (cfg.getFrameworkSuperUser().isPresent()) {
- cmdLine.append(" sudo ");
- }
- }
-
- protected void appendUserSudo(StringBuilder cmdLine) {
- if (cfg.getFrameworkSuperUser().isPresent()) {
- cmdLine.append(" sudo -E -u ");
- cmdLine.append(cfg.getFrameworkUser().get());
- cmdLine.append(" -H ");
- }
- }
-
- protected void addYarnNodemanagerOpt(String propertyName, String propertyValue) {
- String envOpt = String.format(PROPERTY_FORMAT, propertyName, propertyValue);
- if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) {
- String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS);
- environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + envOpt);
- } else {
- environment.put(ENV_YARN_NODEMANAGER_OPTS, envOpt);
- }
- }
-
- @Override
- public String getConfigurationUrl() {
- String httpPolicy = conf.get(TaskFactory.YARN_HTTP_POLICY);
- String address = StringUtils.EMPTY;
- if (httpPolicy != null && httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) {
- address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
- if (StringUtils.isEmpty(address)) {
- address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8090";
- }
- } else {
- address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
- if (StringUtils.isEmpty(address)) {
- address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8088";
- }
- }
-
- return "http://" + address + "/conf";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java
new file mode 100644
index 0000000..035da3f
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.myriad.scheduler;
+
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.mesos.Protos;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+
+import org.apache.mesos.Protos.CommandInfo;
+
+/**
+ * Implementation assumes NM binaries already deployed
+ */
+public class NMExecutorCommandLineGenerator extends ExecutorCommandLineGenerator {
+
+ /**
+ * YARN class to help handle LCE resources
+ */
+ public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS";
+ public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
+ public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores";
+ public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb";
+
+ public static final String YARN_NM_CMD = " $YARN_HOME/bin/yarn nodemanager";
+
+ public NMExecutorCommandLineGenerator(MyriadConfiguration cfg) {
+ this.myriadConfiguration = cfg;
+ this.myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
+ generateStaticCommandLine();
+ }
+
+ @Override
+ CommandInfo generateCommandLine(ServiceResourceProfile profile,
+ ServiceConfiguration serviceConfiguration, Collection<Long> ports) {
+ CommandInfo.Builder builder = CommandInfo.newBuilder();
+ builder.mergeFrom(staticCommandInfo);
+ builder.setEnvironment(generateEnvironment(profile, ports));
+ builder.setUser(getUser());
+ return builder.build();
+ }
+
+ protected void generateStaticCommandLine() {
+ CommandInfo.Builder builder = CommandInfo.newBuilder();
+ StringBuilder cmdLine = new StringBuilder();
+ appendCgroupsCmds(cmdLine);
+ appendDistroExtractionCommands(cmdLine);
+ appendUserSudo(cmdLine);
+ cmdLine.append(YARN_NM_CMD);
+ builder.setValue(String.format(CMD_FORMAT, cmdLine.toString()));
+ builder.addAllUris(getUris());
+ staticCommandInfo = builder.build();
+ }
+
+ protected Protos.Environment generateEnvironment(ServiceResourceProfile profile, Collection<Long> ports) {
+ Map<String, String> yarnEnv = myriadConfiguration.getYarnEnvironment();
+ Protos.Environment.Builder builder = Protos.Environment.newBuilder();
+ builder.addAllVariables(Iterables.transform(yarnEnv.entrySet(), new Function<Map.Entry<String, String>, Protos.Environment.Variable>() {
+ public Protos.Environment.Variable apply(Map.Entry<String, String> x) {
+ return Protos.Environment.Variable.newBuilder().setName(x.getKey()).setValue(x.getValue()).build();
+ }
+ }));
+
+ StringBuilder yarnOpts = new StringBuilder();
+ String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME);
+
+
+ if (StringUtils.isNotEmpty(rmHostName)) {
+ addJavaOpt(yarnOpts, KEY_YARN_RM_HOSTNAME, rmHostName);
+ }
+
+ if (yarnEnv.containsKey(KEY_YARN_HOME)) {
+ addJavaOpt(yarnOpts, KEY_YARN_HOME, yarnEnv.get("YARN_HOME"));
+ }
+
+ addJavaOpt(yarnOpts, KEY_NM_RESOURCE_CPU_VCORES, Integer.toString(profile.getCpus().intValue()));
+ addJavaOpt(yarnOpts, KEY_NM_RESOURCE_MEM_MB, Integer.toString(profile.getMemory().intValue()));
+ Map<String, Long> portsMap = profile.getPorts();
+ Preconditions.checkState(portsMap.size() == ports.size());
+
+ Iterator itr = ports.iterator();
+ for (String portProperty : portsMap.keySet()) {
+ if (portProperty.endsWith("address")) {
+ addJavaOpt(yarnOpts, portProperty, ALL_LOCAL_IPV4ADDR + itr.next());
+ } else if (portProperty.endsWith("port")) {
+ addJavaOpt(yarnOpts, portProperty, itr.next().toString());
+ } else {
+ LOGGER.warn("{} propery isn't an address or port!", portProperty);
+ }
+ }
+
+
+ if (myriadConfiguration.getYarnEnvironment().containsKey(ENV_YARN_NODEMANAGER_OPTS)) {
+ yarnOpts.append(" ").append(yarnEnv.get(ENV_YARN_NODEMANAGER_OPTS));
+ }
+ builder.addAllVariables(Collections.singleton(
+ Protos.Environment.Variable.newBuilder()
+ .setName(ENV_YARN_NODEMANAGER_OPTS)
+ .setValue(yarnOpts.toString()).build())
+ );
+ return builder.build();
+ }
+
+ protected void appendCgroupsCmds(StringBuilder cmdLine) {
+ //These can't be set in the environment as they require commands to be run on the host
+ if (myriadConfiguration.getFrameworkSuperUser().isPresent() && myriadConfiguration.isCgroupEnabled()) {
+ cmdLine.append(" export TASK_DIR=`cat /proc/self/cgroup | grep :cpu: | cut -d: -f3` &&");
+ //The container executor script expects mount-path to exist and owned by the yarn user
+ //See: https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html
+ //If YARN ever moves to cgroup/mem it will be necessary to add a mem version.
+ appendSudo(cmdLine);
+ cmdLine.append("chown " + myriadConfiguration.getFrameworkUser().get() + " ");
+ cmdLine.append(myriadConfiguration.getCGroupPath());
+ cmdLine.append("/cpu$TASK_DIR &&");
+ cmdLine.append(String.format("export %s=\"$%s -D%s=%s\" && ", ENV_YARN_NODEMANAGER_OPTS, ENV_YARN_NODEMANAGER_OPTS,
+ KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, "$TASK_DIR"));
+ } else if (myriadConfiguration.isCgroupEnabled()) {
+ LOGGER.info("frameworkSuperUser not set ignoring cgroup configuration, this will likely can the nodemanager to crash");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java
deleted file mode 100644
index 12490a7..0000000
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler;
-
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Helper class for dynamically assigning ports to nodemanager
- */
-public class NMPorts implements Ports {
- private static final String NM_RPC_PORT_KEY = "nm.rpc.port";
- private static final String NM_LOCALIZER_PORT_KEY = "nm.localizer.port";
- private static final String NM_WEBAPP_HTTP_PORT_KEY = "nm.webapp.http.port";
- private static final String NM_HTTP_SHUFFLE_PORT_KEY = "nm.http.shuffle.port";
-
- private static final String[] NM_PORT_KEYS =
- {NM_RPC_PORT_KEY, NM_LOCALIZER_PORT_KEY, NM_WEBAPP_HTTP_PORT_KEY, NM_HTTP_SHUFFLE_PORT_KEY};
-
- private Map<String, Long> portsMap = new HashMap<>(NM_PORT_KEYS.length);
-
- public NMPorts(Long[] ports) {
- Preconditions.checkState(ports.length == NM_PORT_KEYS.length, "NMPorts: array \"ports\" is of unexpected length");
- for (int i = 0; i < NM_PORT_KEYS.length; i++) {
- portsMap.put(NM_PORT_KEYS[i], ports[i]);
- }
- }
-
- public long getRpcPort() {
- return portsMap.get(NM_RPC_PORT_KEY);
- }
-
- public long getLocalizerPort() {
- return portsMap.get(NM_LOCALIZER_PORT_KEY);
- }
-
- public long getWebAppHttpPort() {
- return portsMap.get(NM_WEBAPP_HTTP_PORT_KEY);
- }
-
- public long getShufflePort() {
- return portsMap.get(NM_HTTP_SHUFFLE_PORT_KEY);
- }
-
- public static int expectedNumPorts() {
- return NM_PORT_KEYS.length;
- }
-
- /**
- * @return a string representation of NMPorts
- */
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder().append("{");
- for (String key : NM_PORT_KEYS) {
- sb.append(key).append(": ").append(portsMap.get(key).toString()).append(", ");
- }
- sb.replace(sb.length() - 2, sb.length(), "}");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java
new file mode 100644
index 0000000..6c7e209
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.myriad.scheduler;
+
+import com.google.inject.Inject;
+import org.apache.mesos.Protos;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.scheduler.resource.ResourceOfferContainer;
+import org.apache.myriad.state.NodeTask;
+
+import java.util.List;
+
+/**
+ * Creates Node Manager Tasks based upon Mesos offers
+ */
+public class NMTaskFactory extends TaskFactory {
+
+
+ @Inject
+ NMTaskFactory(MyriadConfiguration cfg, TaskUtils taskUtils, ExecutorCommandLineGenerator clGenerator) {
+ super(cfg, taskUtils, clGenerator);
+ }
+
+ @Override
+ public Protos.TaskInfo createTask(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.TaskID taskId, NodeTask nodeTask) {
+ ServiceResourceProfile serviceProfile = nodeTask.getProfile();
+ Double taskMemory = serviceProfile.getAggregateMemory();
+ Double taskCpus = serviceProfile.getAggregateCpu();
+ List<Protos.Resource> portResources = resourceOfferContainer.consumePorts(serviceProfile.getPorts().values());
+ Protos.CommandInfo commandInfo = clGenerator.generateCommandLine(serviceProfile, null, rangesConverter(portResources));
+ Protos.ExecutorInfo executorInfo = getExecutorInfoForSlave(resourceOfferContainer, frameworkId, commandInfo);
+ Protos.TaskInfo.Builder taskBuilder = Protos.TaskInfo.newBuilder().setName(cfg.getFrameworkName() + "-" + taskId.getValue()).setTaskId(taskId).setSlaveId(
+ resourceOfferContainer.getSlaveId());
+
+ return taskBuilder
+ .addAllResources(resourceOfferContainer.consumeCpus(taskCpus))
+ .addAllResources(resourceOfferContainer.consumeMem(taskMemory))
+ .addAllResources(portResources)
+ .setExecutor(executorInfo)
+ .build();
+ }
+
+ @Override
+ public Protos.ExecutorInfo getExecutorInfoForSlave(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.CommandInfo commandInfo) {
+ Protos.ExecutorID executorId = Protos.ExecutorID.newBuilder()
+ .setValue(EXECUTOR_PREFIX + frameworkId.getValue() + resourceOfferContainer.getOfferId() +
+ resourceOfferContainer.getSlaveId().getValue())
+ .build();
+ Protos.ExecutorInfo.Builder executorInfo = Protos.ExecutorInfo.newBuilder().setCommand(commandInfo).setName(EXECUTOR_NAME).setExecutorId(executorId)
+ .addAllResources(resourceOfferContainer.consumeCpus(taskUtils.getExecutorCpus()))
+ .addAllResources(resourceOfferContainer.consumeMem(taskUtils.getExecutorMemory()));
+ if (cfg.getContainerInfo().isPresent()) {
+ executorInfo.setContainer(getContainerInfo());
+ }
+ return executorInfo.build();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java
deleted file mode 100644
index 03150fb..0000000
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler;
-
-/**
- * Generic interface to represent ports
- */
-public interface Ports {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
index 8765226..083d54d 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
@@ -17,24 +17,94 @@
*/
package org.apache.myriad.scheduler;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.mesos.Protos;
import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
/**
* CommandLineGenerator for any aux service launched by Myriad as binary distro
*/
-public class ServiceCommandLineGenerator extends DownloadNMExecutorCLGenImpl {
+public class ServiceCommandLineGenerator extends ExecutorCommandLineGenerator {
+
+ public static final String ENV_HADOOP_OPTS = "HADOOP_OPTS";
+ private String baseCmd;
- public ServiceCommandLineGenerator(MyriadConfiguration cfg, String nodeManagerUri) {
- super(cfg, nodeManagerUri);
+ public ServiceCommandLineGenerator(MyriadConfiguration cfg) {
+ this.myriadConfiguration = cfg;
+ myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
+ generateStaticCommandLine();
+ }
+
+ protected void generateStaticCommandLine() {
+ Protos.CommandInfo.Builder builder = Protos.CommandInfo.newBuilder();
+ builder.addAllUris(getUris());
+ builder.setUser(getUser());
+ staticCommandInfo = builder.build();
+
+ StringBuilder cmdLine = new StringBuilder();
+ appendDistroExtractionCommands(cmdLine);
+ appendUserSudo(cmdLine);
+ baseCmd = cmdLine.toString();
}
@Override
- public String generateCommandLine(ServiceResourceProfile profile, Ports ports) {
- StringBuilder strB = new StringBuilder();
- appendDistroExtractionCommands(strB);
- appendUserSudo(strB);
- return strB.toString();
+ public Protos.CommandInfo generateCommandLine(ServiceResourceProfile profile,
+ ServiceConfiguration serviceConfiguration,
+ Collection<Long> ports) {
+ Protos.CommandInfo.Builder builder = Protos.CommandInfo.newBuilder();
+ builder.mergeFrom(staticCommandInfo);
+ builder.setValue(String.format(CMD_FORMAT, baseCmd + " " + serviceConfiguration.getCommand().get()));
+ builder.setEnvironment(generateEnvironment(profile, ports));
+ return builder.build();
+ }
+
+ protected Protos.Environment generateEnvironment(ServiceResourceProfile serviceResourceProfile, Collection<Long> ports) {
+ Map<String, String> yarnEnv = myriadConfiguration.getYarnEnvironment();
+ Protos.Environment.Builder builder = Protos.Environment.newBuilder();
+
+ builder.addAllVariables(Iterables.transform(yarnEnv.entrySet(), new Function<Map.Entry<String, String>, Protos.Environment.Variable>() {
+ public Protos.Environment.Variable apply(Map.Entry<String, String> x) {
+ return Protos.Environment.Variable.newBuilder().setName(x.getKey()).setValue(x.getValue()).build();
+ }
+ }));
+
+ StringBuilder hadoopOpts = new StringBuilder();
+ String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME);
+
+ if (StringUtils.isNotEmpty(rmHostName)) {
+ addJavaOpt(hadoopOpts, KEY_YARN_RM_HOSTNAME, rmHostName);
+ }
+
+ if (yarnEnv.containsKey(KEY_YARN_HOME)) {
+ addJavaOpt(hadoopOpts, KEY_YARN_HOME, yarnEnv.get("YARN_HOME"));
+ }
+
+ Map<String, Long> portsMap = serviceResourceProfile.getPorts();
+ Preconditions.checkState(portsMap.size() == ports.size());
+ Iterator itr = ports.iterator();
+ for (String portProperty : portsMap.keySet()) {
+ addJavaOpt(hadoopOpts, portProperty, ALL_LOCAL_IPV4ADDR + itr.next());
+ }
+
+ if (myriadConfiguration.getYarnEnvironment().containsKey(ENV_HADOOP_OPTS)) {
+ hadoopOpts.append(" ").append(yarnEnv.get(ENV_HADOOP_OPTS));
+ }
+ builder.addAllVariables(Collections.singleton(
+ Protos.Environment.Variable.newBuilder()
+ .setName(ENV_HADOOP_OPTS)
+ .setValue(hadoopOpts.toString()).build())
+ );
+ return builder.build();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
index 146a80c..147ed47 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
@@ -24,11 +24,13 @@ import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import java.lang.reflect.Type;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Resource Profile for any service
+ * Resource Profile for any service
*/
public class ServiceResourceProfile {
@@ -50,18 +52,14 @@ public class ServiceResourceProfile {
protected String className = ServiceResourceProfile.class.getName();
- public ServiceResourceProfile(String name, Double cpus, Double mem) {
- this.name = name;
- this.cpus = cpus;
- this.memory = mem;
- }
+ protected Map<String, Long> ports;
- public ServiceResourceProfile(String name, Double cpus, Double mem, Double execCpus, Double execMemory) {
+ public ServiceResourceProfile(String name, Double cpus, Double mem, Map<String, Long> ports) {
this.name = name;
this.cpus = cpus;
this.memory = mem;
- this.executorCpu = execCpus;
- this.executorMemory = execMemory;
+ this.ports = ports;
+ this.className = ServiceResourceProfile.class.getName();
}
public String getName() {
@@ -84,12 +82,8 @@ public class ServiceResourceProfile {
return cpus;
}
- public Double getExecutorCpu() {
- return executorCpu;
- }
-
- public Double getExecutorMemory() {
- return executorMemory;
+ public Map<String, Long> getPorts() {
+ return ports;
}
@Override
@@ -144,10 +138,10 @@ public class ServiceResourceProfile {
}
if (obj == null) {
return false;
- }
+ }
if (getClass() != obj.getClass()) {
return false;
- }
+ }
ServiceResourceProfile other = (ServiceResourceProfile) obj;
if (className == null) {
if (other.className != null) {
@@ -190,7 +184,7 @@ public class ServiceResourceProfile {
}
} else if (!name.equals(other.name)) {
return false;
- }
+ }
return true;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java
deleted file mode 100644
index 60d4c44..0000000
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler;
-
-import java.util.Map;
-import org.apache.myriad.configuration.MyriadConfiguration;
-import org.apache.myriad.configuration.ServiceConfiguration;
-
-/**
- * ServiceTaskConstraints is an implementation of TaskConstraints for a service
- * at this point constraints are on ports
- * Later on there may be other types of constraints added
- */
-public class ServiceTaskConstraints implements TaskConstraints {
-
- private int portsCount = 0;
-
- public ServiceTaskConstraints(MyriadConfiguration cfg, String taskPrefix) {
- Map<String, ServiceConfiguration> auxConfigs = cfg.getServiceConfigurations();
-
- ServiceConfiguration serviceConfig = auxConfigs.get(taskPrefix);
- if (serviceConfig != null) {
- if (serviceConfig.getPorts().isPresent()) {
- portsCount = serviceConfig.getPorts().get().size();
- }
- }
- }
-
- @Override
- public int portsCount() {
- return portsCount;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java
new file mode 100644
index 0000000..57e104f
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.myriad.scheduler;
+
+import com.google.inject.Inject;
+import org.apache.mesos.Protos;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+import org.apache.myriad.scheduler.resource.ResourceOfferContainer;
+import org.apache.myriad.state.NodeTask;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Generic Service Class that allows to create a service solely base don the configuration
+ * Main properties of configuration are:
+ * 1. command to run
+ * 2. Additional env. variables to set (serviceOpts)
+ * 3. ports to use with names of the properties
+ * 4. TODO (yufeldman) executor info
+ */
+public class ServiceTaskFactory extends TaskFactory {
+
+ @Inject
+ ServiceTaskFactory(MyriadConfiguration cfg, TaskUtils taskUtils, ExecutorCommandLineGenerator clGenerator) {
+ super(cfg, taskUtils, clGenerator);
+ this.clGenerator = new ServiceCommandLineGenerator(cfg);
+ }
+
+ @Override
+ public Protos.TaskInfo createTask(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.TaskID taskId, NodeTask nodeTask) {
+ ServiceConfiguration serviceConfig = cfg.getServiceConfiguration(nodeTask.getTaskPrefix()).get();
+
+ Objects.requireNonNull(serviceConfig, "ServiceConfig should be non-null");
+ Objects.requireNonNull(serviceConfig.getCommand().orNull(), "command for ServiceConfig should be non-null");
+ List<Protos.Resource> portResources = resourceOfferContainer.consumePorts(nodeTask.getProfile().getPorts().values());
+ Protos.CommandInfo commandInfo = clGenerator.generateCommandLine(nodeTask.getProfile(), serviceConfig, rangesConverter(portResources));
+
+ LOGGER.info("Command line for service: {} is: {}", commandInfo.getValue());
+
+ Protos.TaskInfo.Builder taskBuilder = Protos.TaskInfo.newBuilder();
+
+ taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(resourceOfferContainer.getSlaveId())
+ .addAllResources(resourceOfferContainer.consumeCpus(nodeTask.getProfile().getCpus()))
+ .addAllResources(resourceOfferContainer.consumeMem(nodeTask.getProfile().getMemory()))
+ .addAllResources(portResources);
+
+ taskBuilder.setCommand(commandInfo);
+ if (cfg.getContainerInfo().isPresent()) {
+ taskBuilder.setContainer(getContainerInfo());
+ }
+ return taskBuilder.build();
+ }
+
+ @Override
+ public Protos.ExecutorInfo getExecutorInfoForSlave(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.CommandInfo commandInfo) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
deleted file mode 100644
index 42f698a..0000000
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.myriad.scheduler;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import javax.inject.Inject;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.CommandInfo.URI;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.Value;
-import org.apache.myriad.configuration.MyriadConfiguration;
-import org.apache.myriad.configuration.MyriadExecutorConfiguration;
-import org.apache.myriad.configuration.ServiceConfiguration;
-import org.apache.myriad.state.NodeTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Generic Service Class that allows to create a service solely base don the configuration
- * Main properties of configuration are:
- * 1. command to run
- * 2. Additional env. variables to set (serviceOpts)
- * 3. ports to use with names of the properties
- * 4. TODO (yufeldman) executor info
- */
-public class ServiceTaskFactoryImpl implements TaskFactory {
- private static final Logger LOGGER = LoggerFactory.getLogger(ServiceTaskFactoryImpl.class);
-
- public static final long DEFAULT_PORT_NUMBER = 0;
-
- private MyriadConfiguration cfg;
- @SuppressWarnings("unused")
- private TaskUtils taskUtils;
- private ServiceCommandLineGenerator clGenerator;
-
- @Inject
- public ServiceTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) {
- this.cfg = cfg;
- this.taskUtils = taskUtils;
- this.clGenerator = new ServiceCommandLineGenerator(cfg, cfg.getMyriadExecutorConfiguration().getNodeManagerUri().orNull());
- }
-
- @Override
- public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask) {
- Objects.requireNonNull(offer, "Offer should be non-null");
- Objects.requireNonNull(nodeTask, "NodeTask should be non-null");
-
- ServiceConfiguration serviceConfig = cfg.getServiceConfiguration(nodeTask.getTaskPrefix()).get();
-
- Objects.requireNonNull(serviceConfig, "ServiceConfig should be non-null");
- Objects.requireNonNull(serviceConfig.getCommand().orNull(), "command for ServiceConfig should be non-null");
-
- final String serviceHostName = "0.0.0.0";
- final String serviceEnv = serviceConfig.getEnvSettings();
- final String rmHostName = System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME);
- List<Long> additionalPortsNumbers = null;
-
- final StringBuilder strB = new StringBuilder("env ");
- if (serviceConfig.getServiceOpts().isPresent()) {
- strB.append(serviceConfig.getServiceOpts().get()).append("=");
-
- strB.append("\"");
- if (StringUtils.isNotEmpty(rmHostName)) {
- strB.append("-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + rmHostName + " ");
- }
-
- Map<String, Long> ports = serviceConfig.getPorts().orNull();
-
- if (MapUtils.isNotEmpty(ports)) {
- int neededPortsCount = 0;
- for (Map.Entry<String, Long> portEntry : ports.entrySet()) {
- Long port = portEntry.getValue();
- if (port == DEFAULT_PORT_NUMBER) {
- neededPortsCount++;
- }
- }
- // use provided ports
- additionalPortsNumbers = getAvailablePorts(offer, neededPortsCount);
- LOGGER.info("No specified ports found or number of specified ports is not enough. Using ports from Mesos Offers: {}",
- additionalPortsNumbers);
- int index = 0;
- for (Map.Entry<String, Long> portEntry : ports.entrySet()) {
- String portProperty = portEntry.getKey();
- Long port = portEntry.getValue();
- if (port == DEFAULT_PORT_NUMBER) {
- port = additionalPortsNumbers.get(index++);
- }
- strB.append("-D" + portProperty + "=" + serviceHostName + ":" + port + " ");
- }
- }
- strB.append(serviceEnv);
- strB.append("\"");
- }
-
- strB.append(" ");
- strB.append(serviceConfig.getCommand().get());
-
- CommandInfo commandInfo = createCommandInfo(nodeTask.getProfile(), strB.toString());
-
- LOGGER.info("Command line for service: {} is: {}", nodeTask.getTaskPrefix(), strB.toString());
-
- TaskInfo.Builder taskBuilder = TaskInfo.newBuilder();
-
- taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(offer.getSlaveId())
- .addAllResources(taskUtils.getScalarResource(offer, "cpus", nodeTask.getProfile().getCpus(), 0.0))
- .addAllResources(taskUtils.getScalarResource(offer, "mem", nodeTask.getProfile().getMemory(), 0.0));
-
- if (CollectionUtils.isNotEmpty(additionalPortsNumbers)) {
- // set ports
- Value.Ranges.Builder valueRanger = Value.Ranges.newBuilder();
- for (Long port : additionalPortsNumbers) {
- valueRanger.addRange(Value.Range.newBuilder().setBegin(port).setEnd(port));
- }
-
- taskBuilder.addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges(valueRanger.build()));
- }
- taskBuilder.setCommand(commandInfo);
- if (cfg.getContainerInfo().isPresent()) {
- taskBuilder.setContainer(taskUtils.getContainerInfo());
- }
- return taskBuilder.build();
- }
-
- @VisibleForTesting
- CommandInfo createCommandInfo(ServiceResourceProfile profile, String executorCmd) {
- MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
- CommandInfo.Builder commandInfo = CommandInfo.newBuilder();
- Map<String, String> envVars = cfg.getYarnEnvironment();
- if (MapUtils.isNotEmpty(envVars)) {
- Protos.Environment.Builder yarnHomeB = Protos.Environment.newBuilder();
- for (Map.Entry<String, String> envEntry : envVars.entrySet()) {
- Protos.Environment.Variable.Builder yarnEnvB = Protos.Environment.Variable.newBuilder();
- yarnEnvB.setName(envEntry.getKey()).setValue(envEntry.getValue());
- yarnHomeB.addVariables(yarnEnvB.build());
- }
- commandInfo.mergeEnvironment(yarnHomeB.build());
- }
-
- if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
- //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct.
- if (!minimumUserSet(cfg)) {
- throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + "and/or frameworkSuperUser not set!");
- }
-
- LOGGER.info("Using remote distribution");
- String clGeneratedCommand = clGenerator.generateCommandLine(profile, null);
-
- String nmURIString = myriadExecutorConfiguration.getNodeManagerUri().get();
-
- //Concatenate all the subcommands
- String cmd = clGeneratedCommand + " " + executorCmd;
-
- //get the nodemanagerURI
- //We're going to extract ourselves, so setExtract is false
- LOGGER.info("Getting Hadoop distribution from:" + nmURIString);
- URI nmUri = URI.newBuilder().setValue(nmURIString).setExtract(false).build();
-
- //get configs directly from resource manager
- String configUrlString = clGenerator.getConfigurationUrl();
- LOGGER.info("Getting config from:" + configUrlString);
- URI configUri = URI.newBuilder().setValue(configUrlString).build();
-
- LOGGER.info("Slave will execute command:" + cmd);
- commandInfo.addUris(nmUri).addUris(configUri).setValue("echo \"" + cmd + "\";" + cmd);
- commandInfo.setUser(cfg.getFrameworkSuperUser().get());
-
- } else {
- commandInfo.setValue(executorCmd);
- }
- return commandInfo.build();
- }
-
- private Boolean minimumUserSet(MyriadConfiguration conf) {
- return (cfg.getFrameworkUser().isPresent() || cfg.getFrameworkSuperUser().isPresent());
- }
-
- @Override
- public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo) {
- // TODO (yufeldman) if executor specified use it , otherwise return null
- // nothing to implement here, since we are using default slave executor
- return null;
- }
-
- /**
- * Helper method to reserve ports
- *
- * @param offer
- * @param requestedPorts
- * @return
- */
- private List<Long> getAvailablePorts(Offer offer, int requestedPorts) {
- if (requestedPorts == 0) {
- return null;
- }
-
- final List<Long> returnedPorts = new ArrayList<>();
- for (Resource resource : offer.getResourcesList()) {
- if (resource.getName().equals("ports") && (isDefaultRole(resource))) {
- Iterator<Value.Range> itr = resource.getRanges().getRangeList().iterator();
- while (itr.hasNext()) {
- Value.Range range = itr.next();
- if (range.getBegin() <= range.getEnd()) {
- long i = range.getBegin();
- while (i <= range.getEnd() && returnedPorts.size() < requestedPorts) {
- returnedPorts.add(i);
- i++;
- }
- if (returnedPorts.size() >= requestedPorts) {
- return returnedPorts;
- }
- }
- }
- }
- }
- //this is actually an error condition - we did not have enough ports to use
- //TODO (hokiegeek2) does this need error handling then?
- return returnedPorts;
- }
-
- private boolean isDefaultRole(Resource resource) {
- return !resource.hasRole() || resource.getRole().equals("*");
- }
-}
\ No newline at end of file