You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@myriad.apache.org by sm...@apache.org on 2015/10/16 22:39:56 UTC

[4/4] incubator-myriad git commit: Myriad-Issue-60 Added ability to launch JHS and other services that fit under Myriad umbrella from MyriadScheduler using default Mesos slave executor

Myriad-Issue-60 Added ability to launch JHS and other services that fit under Myriad umbrella from MyriadScheduler using default Mesos slave executor

Myriad-Issue-60 review changes

Disable automatic service startup at MyriadStartup, as upon Myriad restart it may try to start multiple times. Recovery of those tasks will be done as part of Myriad HA.

Added binary distribution for JHS. It is using same properties from yml file as NM, since source code for both should be same

Updating licensing info to Apache 2.0 License for newly created files

SchedulerState refactoring, naming changes, introduce limit on number of instances
of a particular service instances that can run at the same time.

Changed behavior with ports. New behavior: if ports are specified in the configuration they will be used irrespective of anything. If ports are not specified in configuration they will be selected form list of ports from Mesos offer.
In both cases following properties will be set:
        "myriad.mapreduce.jobhistory.admin.address",
        "myriad.mapreduce.jobhistory.address",
        "myriad.mapreduce.jobhistory.webapp.address"

So they can be used in configuration files. E.g:
<property>
  <name>mapreduce.jobhistory.admin.address</name>
  <value>${myriad.mapreduce.jobhistory.admin.address}</value>
</property>
<property>
  <name>mapreduce.jobhistory.address</name>
  <value>${myriad.mapreduce.jobhistory.address}</value>
</property>
<property>
  <name>mapreduce.jobhistory.webapp.address</name>
  <value>${myriad.mapreduce.jobhistory.webapp.address}</value>
</property>

In case of random ports JHS web app address  has to be known by clients in order to successfully start/finish job.

Fixing issue with resources for NM reporting as 0 and allowing submit a task that requires more resources then mess offer

Refactoring to abstract out ServiceResourceProfile and ExtendedProfile that will have NMProfile included in addition to service resources information. It helps to still keep all the tasks in one place.

Renamed to more meaningful names - such as Service(s), Refactoring command line generation for reuse between NM and others, Created interfacePorts to abstract out ports, rather then tagging along NMPorts class to other services, added tests for CommandLineGeneration

Fixes to unit test

MYRIAD-121  Reworked Service configuration, commands, etc. to rely just on the configuration settings to start the service

Example of configuration for JHS would be:
services:
   jobhistory:
     jvmMaxMemoryMB: 64
     cpus: 0.5
     ports:
       myriad.mapreduce.jobhistory.admin.address: 10033
       myriad.mapreduce.jobhistory.address: 10020
       myriad.mapreduce.jobhistory.webapp.address: 19888
     envSettings: -Dcluster.name.prefix=/mycluster
     taskName: jobhistory
     serviceOptsName: HADOOP_JOB_HISTORYSERVER_OPTS
     command: $YARN_HOME/bin/mapred historyserver
     maxInstances: 1

Addressed review comments

Addressing review comments, Adding handling exception in ResourceOffersEventHandler while TaskInfo is created to make sure thread is not stopped because of some rogue tasks

Adding DEFAULT_PORT_NUMBER and setting it to 0


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/46573816
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/46573816
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/46573816

Branch: refs/heads/master
Commit: 465738167df60f9a8ae1a258d5e87e5a74ca05d5
Parents: 766f721
Author: Yuliya Feldman <yf...@maprtech.com>
Authored: Thu Jul 30 22:54:37 2015 -0700
Committer: Santosh Marella <ma...@gmail.com>
Committed: Fri Oct 16 12:14:59 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 myriad-scheduler/build.gradle                   |   1 +
 .../src/main/java/com/ebay/myriad/Main.java     |  85 +++-
 .../main/java/com/ebay/myriad/MyriadModule.java |  36 +-
 .../com/ebay/myriad/api/ClustersResource.java   | 119 ++++-
 .../api/model/FlexDownServiceRequest.java       |  63 +++
 .../myriad/api/model/FlexUpClusterRequest.java  |   2 +-
 .../myriad/api/model/FlexUpServiceRequest.java  |  61 +++
 .../MyriadBadConfigurationException.java        |  32 ++
 .../configuration/MyriadConfiguration.java      |  14 +
 .../configuration/NodeManagerConfiguration.java |   2 +
 .../configuration/ServiceConfiguration.java     | 132 ++++++
 .../scheduler/DownloadNMExecutorCLGenImpl.java  |  10 +-
 .../scheduler/ExecutorCommandLineGenerator.java |   4 +-
 .../scheduler/ExtendedResourceProfile.java      |  65 +++
 .../ebay/myriad/scheduler/MyriadOperations.java | 138 +++++-
 .../myriad/scheduler/NMExecutorCLGenImpl.java   |  25 +-
 .../java/com/ebay/myriad/scheduler/NMPorts.java |   2 +-
 .../scheduler/NMTaskFactoryAnnotation.java      |  34 ++
 .../java/com/ebay/myriad/scheduler/Ports.java   |   9 +
 .../com/ebay/myriad/scheduler/Rebalancer.java   |  15 +-
 .../ebay/myriad/scheduler/SchedulerUtils.java   |  12 +-
 .../scheduler/ServiceCommandLineGenerator.java  |  42 ++
 .../myriad/scheduler/ServiceProfileManager.java |  39 ++
 .../scheduler/ServiceResourceProfile.java       | 117 +++++
 .../scheduler/ServiceTaskConstraints.java       |  36 ++
 .../scheduler/ServiceTaskFactoryImpl.java       | 266 ++++++++++++
 .../ebay/myriad/scheduler/TaskConstraints.java  |  34 ++
 .../scheduler/TaskConstraintsManager.java       |  31 ++
 .../com/ebay/myriad/scheduler/TaskFactory.java  |  73 ++--
 .../com/ebay/myriad/scheduler/TaskUtils.java    |  32 ++
 .../handlers/ResourceOffersEventHandler.java    | 111 ++---
 .../scheduler/fgs/YarnNodeCapacityManager.java  |   8 +-
 .../java/com/ebay/myriad/state/NodeTask.java    |  41 +-
 .../com/ebay/myriad/state/SchedulerState.java   | 432 ++++++++++++++-----
 .../myriad/state/utils/ByteBufferSupport.java   |  26 +-
 .../ebay/myriad/state/utils/StoreContext.java   |  12 +-
 .../java/com/ebay/myriad/MultiBindingsTest.java |  73 ++++
 .../com/ebay/myriad/MultiBindingsUsage.java     |  39 ++
 .../java/com/ebay/myriad/MyriadTestModule.java  | 109 +++++
 .../MyriadBadConfigurationExceptionTest.java    |  49 +++
 .../configuration/MyriadConfigurationTest.java  |  69 +++
 .../myriad/scheduler/SchedulerUtilsSpec.groovy  |  19 +-
 .../myriad/scheduler/TMSTaskFactoryImpl.java    |  75 ++++
 .../scheduler/TestServiceCommandLine.java       |  68 +++
 .../ebay/myriad/scheduler/TestTaskUtils.java    | 103 +++++
 .../resources/myriad-config-test-default.yml    |  55 +++
 47 files changed, 2546 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4136792..1059852 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,5 +1,6 @@
 allprojects {
     apply plugin: 'idea'
+    apply plugin: 'eclipse'
 }
 
 idea {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/build.gradle
----------------------------------------------------------------------
diff --git a/myriad-scheduler/build.gradle b/myriad-scheduler/build.gradle
index aad71a1..bf4b68b 100644
--- a/myriad-scheduler/build.gradle
+++ b/myriad-scheduler/build.gradle
@@ -15,6 +15,7 @@ dependencies {
     compile "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.5.1"
     compile "org.apache.curator:curator-framework:2.7.1"
     compile "org.apache.commons:commons-lang3:3.4"
+    compile 'com.google.inject.extensions:guice-multibindings:3.0'
     testCompile "org.apache.hadoop:hadoop-yarn-server-resourcemanager:${hadoopVer}:tests"
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
index 2045f24..f73e1f0 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
@@ -21,16 +21,25 @@ package com.ebay.myriad;
 import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.health.HealthCheckRegistry;
+import com.ebay.myriad.configuration.NodeManagerConfiguration;
+import com.ebay.myriad.configuration.ServiceConfiguration;
+import com.ebay.myriad.configuration.MyriadBadConfigurationException;
 import com.ebay.myriad.configuration.MyriadConfiguration;
 import com.ebay.myriad.health.MesosDriverHealthCheck;
 import com.ebay.myriad.health.MesosMasterHealthCheck;
 import com.ebay.myriad.health.ZookeeperHealthCheck;
+import com.ebay.myriad.scheduler.ExtendedResourceProfile;
 import com.ebay.myriad.scheduler.MyriadDriverManager;
 import com.ebay.myriad.scheduler.MyriadOperations;
 import com.ebay.myriad.scheduler.NMProfile;
-import com.ebay.myriad.scheduler.NMProfileManager;
 import com.ebay.myriad.scheduler.Rebalancer;
+import com.ebay.myriad.scheduler.ServiceProfileManager;
+import com.ebay.myriad.scheduler.ServiceResourceProfile;
+import com.ebay.myriad.scheduler.ServiceTaskConstraints;
+import com.ebay.myriad.scheduler.TaskConstraintsManager;
+import com.ebay.myriad.scheduler.TaskFactory;
 import com.ebay.myriad.scheduler.TaskTerminator;
+import com.ebay.myriad.scheduler.TaskUtils;
 import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
 import com.ebay.myriad.webapp.MyriadWebServer;
 import com.ebay.myriad.webapp.WebAppGuiceModule;
@@ -38,6 +47,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+
 import org.apache.commons.collections.MapUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -99,14 +109,17 @@ public class Main {
         initHealthChecks(injector);
         initProfiles(injector);
         validateNMInstances(injector);
+        initServiceConfigurations(cfg, injector);
         initDisruptors(injector);
 
         initRebalancerService(cfg, injector);
         initTerminatorService(injector);
         startMesosDriver(injector);
         startNMInstances(injector);
+        startJavaBasedTaskInstance(injector);
     }
 
+
     private void startMesosDriver(Injector injector) {
         LOGGER.info("starting mesosDriver..");
         injector.getInstance(MyriadDriverManager.class).startDriver();
@@ -141,8 +154,11 @@ public class Main {
 
     private void initProfiles(Injector injector) {
         LOGGER.info("Initializing Profiles");
-        NMProfileManager profileManager = injector.getInstance(NMProfileManager.class);
+        ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
+        TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class);
+        taskConstraintsManager.addTaskConstraints(NodeManagerConfiguration.NM_TASK_PREFIX, new TaskFactory.NMTaskConstraints());
         Map<String, Map<String, String>> profiles = injector.getInstance(MyriadConfiguration.class).getProfiles();
+        TaskUtils taskUtils = injector.getInstance(TaskUtils.class);
         if (MapUtils.isNotEmpty(profiles)) {
             for (Map.Entry<String, Map<String, String>> profile : profiles.entrySet()) {
                 Map<String, String> profileResourceMap = profile.getValue();
@@ -152,7 +168,12 @@ public class Main {
                     Long cpu = Long.parseLong(profileResourceMap.get("cpu"));
                     Long mem = Long.parseLong(profileResourceMap.get("mem"));
 
-                    profileManager.add(new NMProfile(profile.getKey(), cpu, mem));
+                    ServiceResourceProfile serviceProfile = new ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem), 
+                        taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory());
+                    serviceProfile.setExecutorCpu(taskUtils.getExecutorCpus());
+                    serviceProfile.setExecutorMemory(taskUtils.getExecutorMemory());
+                    
+                    profileManager.add(serviceProfile);
                 } else {
                     LOGGER.error("Invalid definition for profile: " + profile.getKey());
                 }
@@ -163,19 +184,20 @@ public class Main {
     private void validateNMInstances(Injector injector) {
         LOGGER.info("Validating nmInstances..");
         Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances();
-        NMProfileManager profileManager = injector.getInstance(NMProfileManager.class);
+        ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
+
         long maxCpu = Long.MIN_VALUE;
         long maxMem = Long.MIN_VALUE;
         for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) {
           String profile = entry.getKey();
-          NMProfile nmProfile = profileManager.get(profile);
-          if (nmProfile == null) {
+          ServiceResourceProfile nodeManager = profileManager.get(profile);
+          if (nodeManager == null) {
             throw new RuntimeException("Invalid profile name '" + profile + "' specified in 'nmInstances'");
           }
           if (entry.getValue() > 0) {
-            if (nmProfile.getCpus() > maxCpu) { // find the profile with largest number of cpus
-              maxCpu = nmProfile.getCpus();
-              maxMem = nmProfile.getMemory(); // use the memory from the same profile
+            if (nodeManager.getCpus() > maxCpu) { // find the profile with largest number of cpus
+              maxCpu = nodeManager.getCpus().longValue();
+              maxMem = nodeManager.getMemory().longValue(); // use the memory from the same profile
             }
           }
         }
@@ -188,12 +210,37 @@ public class Main {
     private void startNMInstances(Injector injector) {
       Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances();
       MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class);
-      NMProfileManager profileManager = injector.getInstance(NMProfileManager.class);
+      ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
       for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) {
         myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), entry.getValue(), null);
       }
     }
 
+    /**
+     * Create ServiceProfile for any configured service
+     * @param cfg 
+     * @param injector
+     */
+    private void initServiceConfigurations(MyriadConfiguration cfg, Injector injector) {
+      LOGGER.info("Initializing initServiceConfigurations");
+      ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
+      TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class);
+
+      Map<String, ServiceConfiguration> servicesConfigs = 
+          injector.getInstance(MyriadConfiguration.class).getServiceConfigurations();
+      if (servicesConfigs != null) {
+        for (Map.Entry<String, ServiceConfiguration> entry : servicesConfigs.entrySet()) {
+          final String taskPrefix = entry.getKey();
+          ServiceConfiguration config = entry.getValue();
+          final Double cpu = config.getCpus().or(ServiceConfiguration.DEFAULT_CPU);
+          final Double mem = config.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY);
+          
+          profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem));
+          taskConstraintsManager.addTaskConstraints(taskPrefix, new ServiceTaskConstraints(cfg, taskPrefix));
+        }
+      }
+    }
+
     private void initTerminatorService(Injector injector) {
         LOGGER.info("Initializing Terminator");
         terminatorService = Executors.newScheduledThreadPool(1);
@@ -224,4 +271,22 @@ public class Main {
         disruptorManager.init(injector);
     }
 
+    /**
+     * Start tasks for configured services 
+     * @param injector
+     */
+    private void startJavaBasedTaskInstance(Injector injector) {
+      Map<String, ServiceConfiguration> auxServicesConfigs = 
+          injector.getInstance(MyriadConfiguration.class).getServiceConfigurations();
+      if (auxServicesConfigs != null) {
+        MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class);
+        for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
+          try {
+            myriadOperations.flexUpAService(entry.getValue().getMaxInstances().or(1), entry.getKey());
+          } catch (MyriadBadConfigurationException e) {
+            LOGGER.warn("Exception while trying to flexup service: {}", entry.getKey(), e);
+          }
+        } 
+      }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
index 306f68a..3632334 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
@@ -18,20 +18,25 @@
  */
 package com.ebay.myriad;
 
+import com.ebay.myriad.configuration.ServiceConfiguration;
 import com.ebay.myriad.configuration.MyriadConfiguration;
 import com.ebay.myriad.configuration.MyriadExecutorConfiguration;
+import com.ebay.myriad.configuration.NodeManagerConfiguration;
 import com.ebay.myriad.policy.LeastAMNodesFirstPolicy;
 import com.ebay.myriad.policy.NodeScaleDownPolicy;
 import com.ebay.myriad.scheduler.MyriadDriverManager;
 import com.ebay.myriad.scheduler.MyriadScheduler;
 import com.ebay.myriad.scheduler.fgs.NMHeartBeatHandler;
-import com.ebay.myriad.scheduler.NMProfileManager;
 import com.ebay.myriad.scheduler.fgs.NodeStore;
 import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager;
 import com.ebay.myriad.scheduler.DownloadNMExecutorCLGenImpl;
 import com.ebay.myriad.scheduler.ExecutorCommandLineGenerator;
 import com.ebay.myriad.scheduler.NMExecutorCLGenImpl;
+import com.ebay.myriad.scheduler.NMTaskFactoryAnnotation;
 import com.ebay.myriad.scheduler.ReconcileService;
+import com.ebay.myriad.scheduler.ServiceProfileManager;
+import com.ebay.myriad.scheduler.ServiceTaskFactoryImpl;
+import com.ebay.myriad.scheduler.TaskConstraintsManager;
 import com.ebay.myriad.scheduler.TaskFactory;
 import com.ebay.myriad.scheduler.TaskFactory.NMTaskFactoryImpl;
 import com.ebay.myriad.scheduler.fgs.YarnNodeCapacityManager;
@@ -43,6 +48,7 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
+import com.google.inject.multibindings.MapBinder;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -50,6 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 /**
  * Guice Module for Myriad
  */
@@ -84,16 +92,38 @@ public class MyriadModule extends AbstractModule {
         bind(InterceptorRegistry.class).toInstance(interceptorRegistry);
         bind(MyriadDriverManager.class).in(Scopes.SINGLETON);
         bind(MyriadScheduler.class).in(Scopes.SINGLETON);
-        bind(NMProfileManager.class).in(Scopes.SINGLETON);
+        bind(ServiceProfileManager.class).in(Scopes.SINGLETON);
         bind(DisruptorManager.class).in(Scopes.SINGLETON);
-        bind(TaskFactory.class).to(NMTaskFactoryImpl.class);
         bind(ReconcileService.class).in(Scopes.SINGLETON);
         bind(HttpConnectorProvider.class).in(Scopes.SINGLETON);
+        bind(TaskConstraintsManager.class).in(Scopes.SINGLETON);
+        // add special binding between TaskFactory and NMTaskFactoryImpl to ease up 
+        // usage of TaskFactory
+        bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactoryImpl.class);
         bind(YarnNodeCapacityManager.class).in(Scopes.SINGLETON);
         bind(NodeStore.class).in(Scopes.SINGLETON);
         bind(OfferLifecycleManager.class).in(Scopes.SINGLETON);
         bind(NMHeartBeatHandler.class).asEagerSingleton();
 
+        MapBinder<String, TaskFactory> mapBinder
+        = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
+        mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON);
+        Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
+        if (auxServicesConfigs != null) {
+          for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
+            String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull();
+            if (taskFactoryClass != null) {
+              try {
+                Class<? extends TaskFactory> implClass = (Class<? extends TaskFactory>) Class.forName(taskFactoryClass);
+                mapBinder.addBinding(entry.getKey()).to(implClass).in(Scopes.SINGLETON);
+              } catch (ClassNotFoundException e) {
+                LOGGER.error("ClassNotFoundException", e);
+              }
+            } else {
+              mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON);
+            }
+          }
+        }
         //TODO(Santosh): Should be configurable as well
         bind(NodeScaleDownPolicy.class).to(LeastAMNodesFirstPolicy.class).in(Scopes.SINGLETON);
     }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
index 000b3f3..9820848 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
@@ -20,16 +20,26 @@ package com.ebay.myriad.api;
 
 import com.codahale.metrics.annotation.Timed;
 import com.ebay.myriad.api.model.FlexDownClusterRequest;
+import com.ebay.myriad.api.model.FlexDownServiceRequest;
 import com.ebay.myriad.api.model.FlexUpClusterRequest;
 import com.ebay.myriad.scheduler.MyriadOperations;
-import com.ebay.myriad.scheduler.NMProfile;
-import com.ebay.myriad.scheduler.NMProfileManager;
+import com.ebay.myriad.scheduler.ServiceResourceProfile;
 import com.ebay.myriad.scheduler.constraints.ConstraintFactory;
 import com.ebay.myriad.state.SchedulerState;
 import com.google.common.base.Preconditions;
+
 import java.util.List;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
+
+import com.ebay.myriad.api.model.FlexUpServiceRequest;
+import com.ebay.myriad.configuration.MyriadBadConfigurationException;
+import com.ebay.myriad.configuration.MyriadConfiguration;
+import com.ebay.myriad.scheduler.ServiceProfileManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.PUT;
@@ -39,8 +49,6 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Response.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * RESTful API to resource manager
@@ -51,17 +59,20 @@ public class ClustersResource {
     private static final String LIKE_CONSTRAINT_FORMAT =
         "'<mesos_slave_attribute|hostname> LIKE <value_regex>'";
 
-    private final SchedulerState schedulerState;
-    private final NMProfileManager profileManager;
-    private final MyriadOperations myriadOperations;
+    private MyriadConfiguration cfg;
+    private SchedulerState schedulerState;
+    private ServiceProfileManager profileManager;
+    private MyriadOperations myriadOperations;
 
-  @Inject
-    public ClustersResource(SchedulerState state,
-                            NMProfileManager profileManager,
+    @Inject
+    public ClustersResource(MyriadConfiguration cfg,
+                            SchedulerState state,
+                            ServiceProfileManager profileManager,
                             MyriadOperations myriadOperations) {
-        this.schedulerState = state;
-        this.profileManager = profileManager;
-        this.myriadOperations = myriadOperations;
+      this.cfg = cfg;
+      this.schedulerState = state;
+      this.profileManager = profileManager;
+      this.myriadOperations = myriadOperations;
     }
 
     @Timed
@@ -95,6 +106,46 @@ public class ClustersResource {
 
     @Timed
     @PUT
+    @Path("/flexupservice")
+    @Produces(MediaType.TEXT_PLAIN)
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response flexUpservice(FlexUpServiceRequest request) {
+      Preconditions.checkNotNull(request,
+              "request object cannot be null or empty");
+
+      LOGGER.info("Received Flexup a Service Request");
+
+      Integer instances = request.getInstances();
+      String serviceName = request.getServiceName();
+
+      LOGGER.info("Instances: {}", instances);
+      LOGGER.info("Service: {}", serviceName);
+      
+      // Validate profile request
+      Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED);
+      
+      if (cfg.getServiceConfiguration(serviceName) == null) {
+        response.status(Response.Status.BAD_REQUEST)
+                .entity("Service does not exist: " + serviceName);
+        LOGGER.error("Provided service does not exist " + serviceName);
+        return response.build();
+      }
+      
+      if (!validateInstances(instances, response)) {
+        return response.build();
+      }
+
+      try {
+        this.myriadOperations.flexUpAService(instances, serviceName);
+      } catch (MyriadBadConfigurationException e) {
+        return response.status(Response.Status.BAD_REQUEST).entity(e.getMessage()).build();
+      }
+
+      return response.build();
+    }
+
+    @Timed
+    @PUT
     @Path("/flexdown")
     @Produces(MediaType.TEXT_PLAIN)
     @Consumes(MediaType.APPLICATION_JSON)
@@ -208,10 +259,44 @@ public class ClustersResource {
 
 
     private Integer getNumFlexedupNMs(String profile) {
-      NMProfile nmProfile = profileManager.get(profile);
-      return this.schedulerState.getActiveTaskIDsForProfile(nmProfile).size()
-                + this.schedulerState.getStagingTaskIDsForProfile(nmProfile).size()
-                + this.schedulerState.getPendingTaskIDsForProfile(nmProfile).size();
+      ServiceResourceProfile serviceProfile = profileManager.get(profile);
+      return this.schedulerState.getActiveTaskIDsForProfile(serviceProfile).size()
+                + this.schedulerState.getStagingTaskIDsForProfile(serviceProfile).size()
+                + this.schedulerState.getPendingTaskIDsForProfile(serviceProfile).size();
     }
 
+    @Timed
+    @PUT
+    @Path("/flexdownservice")
+    @Produces(MediaType.TEXT_PLAIN)
+    @Consumes(MediaType.APPLICATION_JSON)
+    public Response flexDownservice(FlexDownServiceRequest request) {
+      Preconditions.checkNotNull(request,
+              "request object cannot be null or empty");
+      
+      Integer instances = request.getInstances();
+      String serviceName = request.getServiceName();
+      
+      LOGGER.info("Received flexdown request for service {}", serviceName);
+      LOGGER.info("Instances: " + instances);
+
+      Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED);
+
+      if (!validateInstances(instances, response)) {
+          return response.build();
+      }
+
+      // warn that number of requested instances isn't available
+      // but instances will still be flexed down
+      Integer flexibleInstances = this.myriadOperations.getFlexibleInstances(serviceName);
+      if (flexibleInstances < instances)  {
+          response.entity("Number of requested instances is greater than available.");
+          // just doing a simple check here. pass the requested number of instances
+          // to myriadOperations and let it sort out how many actually get flexxed down.
+          LOGGER.warn("Requested number of instances greater than available: {} < {}", flexibleInstances, instances);
+      }
+
+      this.myriadOperations.flexDownAService(instances, serviceName);
+      return response.build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownServiceRequest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownServiceRequest.java b/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownServiceRequest.java
new file mode 100644
index 0000000..d565a0a
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownServiceRequest.java
@@ -0,0 +1,63 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad.api.model;
+
+import com.google.gson.Gson;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Flex down an auxtask/service
+ */
+public class FlexDownServiceRequest {
+
+    @NotEmpty
+    public Integer instances;
+    
+    @NotEmpty
+    public String serviceName;
+
+    public FlexDownServiceRequest() {
+    }
+
+    public FlexDownServiceRequest(Integer instances, String serviceName) {
+        this.instances = instances;
+        this.serviceName = serviceName;
+    }
+
+    public Integer getInstances() {
+        return instances;
+    }
+
+    public void setInstances(Integer instances) {
+        this.instances = instances;
+    }
+
+    public String getServiceName() {
+      return serviceName;
+    }
+
+    public void setServiceName(String serviceName) {
+      this.serviceName = serviceName;
+    }
+
+    public String toString() {
+        Gson gson = new Gson();
+        return gson.toJson(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexUpClusterRequest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexUpClusterRequest.java b/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexUpClusterRequest.java
index 3692b1d..5953f8b 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexUpClusterRequest.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexUpClusterRequest.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.hibernate.validator.constraints.NotEmpty;
 
 /**
- * Flex up request parameters
+ * Flex up an auxtask/service
  */
 public class FlexUpClusterRequest {
     @NotEmpty

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexUpServiceRequest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexUpServiceRequest.java b/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexUpServiceRequest.java
new file mode 100644
index 0000000..1bc8db7
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexUpServiceRequest.java
@@ -0,0 +1,61 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package com.ebay.myriad.api.model;
+
+import com.google.gson.Gson;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Flex up request parameters
+ */
+public class FlexUpServiceRequest {
+    @NotEmpty
+    public Integer instances;
+
+    @NotEmpty
+    public String serviceName;
+
+    public FlexUpServiceRequest() {
+    }
+    
+    public FlexUpServiceRequest(Integer instances, String profile) {
+        this.instances = instances;
+        this.serviceName = profile;
+    }
+
+    public Integer getInstances() {
+        return instances;
+    }
+
+    public void setInstances(Integer instances) {
+        this.instances = instances;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public void setServiceName(String profile) {
+        this.serviceName = profile;
+    }
+
+    public String toString() {
+        Gson gson = new Gson();
+        return gson.toJson(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadBadConfigurationException.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadBadConfigurationException.java b/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadBadConfigurationException.java
new file mode 100644
index 0000000..0ac7d5b
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadBadConfigurationException.java
@@ -0,0 +1,32 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad.configuration;
+
+/**
+ * Myriad specific exception
+ *
+ */
+@SuppressWarnings("serial")
+public class MyriadBadConfigurationException extends Exception {
+
+  
+  public MyriadBadConfigurationException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadConfiguration.java b/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadConfiguration.java
index e4c81fe..a723cf0 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadConfiguration.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/MyriadConfiguration.java
@@ -123,6 +123,9 @@ public class MyriadConfiguration {
 
   @JsonProperty
   private NodeManagerConfiguration nodemanager;
+  
+  @JsonProperty
+  private Map<String, ServiceConfiguration> services;
 
   @JsonProperty
   @NotEmpty
@@ -205,7 +208,18 @@ public class MyriadConfiguration {
   public NodeManagerConfiguration getNodeManagerConfiguration() {
     return this.nodemanager;
   }
+  
+  public Map<String, ServiceConfiguration> getServiceConfigurations() {
+    return this.services;
+  }
 
+  public ServiceConfiguration getServiceConfiguration(String taskName) {
+    if (services == null) {
+      return null;
+    }
+    return this.services.get(taskName);
+  }
+  
   public MyriadExecutorConfiguration getMyriadExecutorConfiguration() {
     return this.executor;
   }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/NodeManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/NodeManagerConfiguration.java b/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/NodeManagerConfiguration.java
index db30429..84eb219 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/NodeManagerConfiguration.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/NodeManagerConfiguration.java
@@ -39,6 +39,8 @@ public class NodeManagerConfiguration {
      * Default cpu for NodeManager JVM.
      */
     public static final double DEFAULT_NM_CPUS = 1;
+    
+    public static final String NM_TASK_PREFIX = "nm";
 
     /**
      * Translates to -Xmx for the NodeManager JVM.

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/ServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/ServiceConfiguration.java b/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/ServiceConfiguration.java
new file mode 100644
index 0000000..29eef6d
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/configuration/ServiceConfiguration.java
@@ -0,0 +1,132 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad.configuration;
+
+import java.util.Map;
+
+import org.hibernate.validator.constraints.NotEmpty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+
+/**
+ * Configuration for any service/task to be started from Myriad Scheduler
+ *
+ */
+public class ServiceConfiguration {
+  
+  private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConfiguration.class);
+
+  public static final Double DEFAULT_CPU = 0.1;
+  
+  public static final Double DEFAULT_MEMORY = 256.0;
+  
+  /**
+   * Translates to -Xmx for the JVM.
+   */
+  @JsonProperty
+  protected Double jvmMaxMemoryMB;
+
+  /**
+   * Amount of CPU share given to  JVM. 
+   */
+  @JsonProperty
+  protected Double cpus;
+
+  /**
+   * Translates to jvm opts for the JVM.
+   */
+  @JsonProperty
+  protected String jvmOpts;
+
+  @JsonProperty
+  protected Map<String, Long> ports;
+  
+  /**
+   * If we will have some services
+   * that are not easy to express just by properties
+   * we can use this one to have a specific implementation
+   */
+  @JsonProperty
+  protected String taskFactoryImplName;
+  
+  @JsonProperty
+  protected String envSettings;
+  
+  @JsonProperty
+  @NotEmpty
+  protected String taskName;
+  
+  @JsonProperty
+  protected Integer maxInstances;
+  
+  @JsonProperty
+  protected String command;
+  
+  @JsonProperty
+  protected String serviceOptsName;
+  
+  
+  public Optional<Double> getJvmMaxMemoryMB() {
+      return Optional.fromNullable(jvmMaxMemoryMB);
+  }
+
+  public Optional<String> getJvmOpts() {
+      return Optional.fromNullable(jvmOpts);
+  }
+
+  public Optional<Double> getCpus() {
+      return Optional.fromNullable(cpus);
+  }
+
+  public String getTaskName() {
+    return taskName;
+  }
+
+  public void setTaskName(String taskName) {
+    this.taskName = taskName;
+  }
+
+  public Optional<String> getTaskFactoryImplName() {
+    return Optional.fromNullable(taskFactoryImplName);
+  }
+
+  public String getEnvSettings() {
+    return envSettings;
+  }
+  
+  public Optional<Map<String, Long>> getPorts() {
+    return Optional.fromNullable(ports);
+  }
+  
+  public Optional<Integer> getMaxInstances() {
+    return Optional.fromNullable(maxInstances);
+  }
+
+  public Optional<String> getCommand() {
+    return Optional.fromNullable(command);
+  }
+
+  public String getServiceOpts() {
+    return serviceOptsName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
index 827445e..f156bd0 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
@@ -43,12 +43,12 @@ public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl {
   }
 
 @Override
-  public String generateCommandLine(NMProfile profile, NMPorts ports) {
+  public String generateCommandLine(ServiceResourceProfile profile, Ports ports) {
     StringBuilder cmdLine = new StringBuilder();
     LOGGER.info("Using remote distribution");
 
-    generateEnvironment(profile, ports);
-    appendNMExtractionCommands(cmdLine);
+    generateEnvironment(profile, (NMPorts) ports);
+    appendDistroExtractionCommands(cmdLine);
     appendCgroupsCmds(cmdLine);
     appendYarnHomeExport(cmdLine);
     appendUser(cmdLine);
@@ -57,7 +57,7 @@ public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl {
     return cmdLine.toString();
   }
 
-  private void appendNMExtractionCommands(StringBuilder cmdLine) {
+  protected void appendDistroExtractionCommands(StringBuilder cmdLine) {
     /*
     TODO(darinj): Overall this is messier than I'd like. We can't let mesos untar the distribution, since
     it will change the permissions.  Instead we simply download the tarball and execute tar -xvpf. We also
@@ -82,7 +82,7 @@ public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl {
       .append("/etc/hadoop/yarn-site.xml;");
   }
 
-  private void appendUser(StringBuilder cmdLine) {
+  protected void appendUser(StringBuilder cmdLine) {
     cmdLine.append(" sudo -E -u ").append(cfg.getFrameworkUser().get()).append(" -H");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
index b4e4b0d..9dcb2bf 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
@@ -23,5 +23,7 @@ package com.ebay.myriad.scheduler;
  * Interface to plugin multiple implementations for executor command generation  
  */
 public interface ExecutorCommandLineGenerator {
-    String generateCommandLine(NMProfile profile, NMPorts ports);
+    String generateCommandLine(ServiceResourceProfile profile, Ports ports);
+    
+    String getConfigurationUrl();
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java
new file mode 100644
index 0000000..e30bab4
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExtendedResourceProfile.java
@@ -0,0 +1,65 @@
+package com.ebay.myriad.scheduler;
+
+import com.google.gson.Gson;
+
+/**
+ * Extended ServiceResourceProfile for services that need to pass set of resources downstream
+ * currently the only such service is NodeManager
+ *
+ */
+public class ExtendedResourceProfile extends ServiceResourceProfile {
+
+  private NMProfile childProfile;
+
+  /**
+   * 
+   * @param childProfile - should be null
+   * @param cpu
+   * @param mem
+   * will throw NullPoiterException if childProfile is null
+   */
+  public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double mem) {
+    super(childProfile.getName(), cpu, mem);
+    this.childProfile = childProfile;
+    this.className = ExtendedResourceProfile.class.getName();
+  }
+
+  public NMProfile getChildProfile() {
+    return childProfile;
+  }
+
+  public void setChildProfile(NMProfile nmProfile) {
+    this.childProfile = nmProfile;
+  }
+  
+  @Override
+  public String getName() {
+    return childProfile.getName();
+  }
+
+  @Override
+  public Double getCpus() {
+    return childProfile.getCpus().doubleValue();
+  }
+
+  @Override
+  public Double getMemory() {
+    return childProfile.getMemory().doubleValue();
+  }
+
+  @Override
+  public Double getAggregateMemory() {
+    return memory + childProfile.getMemory();
+  }
+  
+  @Override
+  public Double getAggregateCpu() {
+    return cpus + childProfile.getCpus();
+  }
+
+  @Override
+  public String toString() {
+      Gson gson = new Gson();
+      return gson.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
index fb7b059..aff604c 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
@@ -18,13 +18,19 @@
  */
 package com.ebay.myriad.scheduler;
 
+import com.ebay.myriad.configuration.MyriadBadConfigurationException;
+import com.ebay.myriad.configuration.ServiceConfiguration;
+import com.ebay.myriad.configuration.MyriadConfiguration;
+import com.ebay.myriad.configuration.NodeManagerConfiguration;
 import com.ebay.myriad.policy.NodeScaleDownPolicy;
 import com.ebay.myriad.scheduler.constraints.Constraint;
 import com.ebay.myriad.scheduler.constraints.LikeConstraint;
 import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
+
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Myriad scheduler operations
@@ -39,58 +46,157 @@ import java.util.List;
 public class MyriadOperations {
     private static final Logger LOGGER = LoggerFactory.getLogger(MyriadOperations.class);
     private final SchedulerState schedulerState;
+
+    private MyriadConfiguration cfg;
     private NodeScaleDownPolicy nodeScaleDownPolicy;
 
     @Inject
-    public MyriadOperations(SchedulerState schedulerState,
+    public MyriadOperations(MyriadConfiguration cfg,
+                            SchedulerState schedulerState,
                             NodeScaleDownPolicy nodeScaleDownPolicy) {
+      this.cfg = cfg;
       this.schedulerState = schedulerState;
       this.nodeScaleDownPolicy = nodeScaleDownPolicy;
     }
 
-    public void flexUpCluster(NMProfile profile, int instances, Constraint constraint) {
+    public void flexUpCluster(ServiceResourceProfile serviceResourceProfile, int instances, Constraint constraint) {
         Collection<NodeTask> nodes = new HashSet<>();
         for (int i = 0; i < instances; i++) {
-            nodes.add(new NodeTask(profile, constraint));
+          NodeTask nodeTask = new NodeTask(serviceResourceProfile, constraint);
+          nodeTask.setTaskPrefix(NodeManagerConfiguration.NM_TASK_PREFIX);
+          nodes.add(nodeTask);
         }
 
+        LOGGER.info("Adding {} NM instances to cluster", nodes.size());
         this.schedulerState.addNodes(nodes);
     }
 
-    public void flexDownCluster(NMProfile profile, Constraint constraint, int numInstancesToScaleDown) {
+    public void flexDownCluster(ServiceResourceProfile serviceResourceProfile, Constraint constraint, int numInstancesToScaleDown) {
         // Flex down Pending tasks, if any
         int numPendingTasksScaledDown = flexDownPendingTasks(
-            profile, constraint, numInstancesToScaleDown);
+            serviceResourceProfile, constraint, numInstancesToScaleDown);
 
         // Flex down Staging tasks, if any
         int numStagingTasksScaledDown = flexDownStagingTasks(
-            profile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown);
+            serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown);
 
         // Flex down Active tasks, if any
         int numActiveTasksScaledDown = flexDownActiveTasks(
-            profile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown - numStagingTasksScaledDown);
+            serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown - numStagingTasksScaledDown);
 
         if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown == 0) {
           LOGGER.info("No Node Managers with profile '{}' and constraint '{}' found for scaling down.",
-              profile.getName(), constraint == null ? "null" : constraint.toString());
+              serviceResourceProfile.getName(), constraint == null ? "null" : constraint.toString());
         } else {
           LOGGER.info("Flexed down {} active, {} staging  and {} pending Node Managers with " +
               "'{}' profile and constraint '{}'.", numActiveTasksScaledDown, numStagingTasksScaledDown,
-              numPendingTasksScaledDown, profile.getName(), constraint == null ? "null" : constraint.toString());
+              numPendingTasksScaledDown, serviceResourceProfile.getName(), constraint == null ? "null" : constraint.toString());
         }
     }
 
-    private int flexDownPendingTasks(NMProfile profile, Constraint constraint, int numInstancesToScaleDown) {
+    /**
+     * Flexup a service
+     * @param instances
+     * @param serviceName
+     */
+    public void flexUpAService(int instances, String serviceName) throws MyriadBadConfigurationException {
+      final ServiceConfiguration auxTaskConf = cfg.getServiceConfiguration(serviceName);
+      
+      int totalflexInstances = instances + getFlexibleInstances(serviceName);
+      Integer maxInstances = auxTaskConf.getMaxInstances().orNull();
+      if (maxInstances != null && maxInstances > 0) {
+        // check number of instances
+        // sum of active, staging, pending should be < maxInstances
+        if (totalflexInstances > maxInstances) {
+          LOGGER.error("Current number of active, staging, pending and requested instances: {}"
+              + ", while it is greater then max instances allowed: {}", totalflexInstances, maxInstances);
+            throw new MyriadBadConfigurationException("Current number of active, staging, pending instances and requested: "
+            + totalflexInstances + ", while it is greater then max instances allowed: " + maxInstances);          
+        }
+      }
+
+      final Double cpu = auxTaskConf.getCpus().or(ServiceConfiguration.DEFAULT_CPU);
+      final Double mem = auxTaskConf.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY);
+      
+      Collection<NodeTask> nodes = new HashSet<>();
+      for (int i = 0; i < instances; i++) {
+        NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, cpu, mem), null);
+        nodeTask.setTaskPrefix(serviceName);
+        nodes.add(nodeTask);
+      }
+
+      LOGGER.info("Adding {} {} instances to cluster", nodes.size(), serviceName);
+      this.schedulerState.addNodes(nodes);
+    }
+    
+    /**
+     * Flexing down any service defined in the configuration
+     * @param numInstancesToScaleDown
+     * @param serviceName - name of the service
+     */
+    public void flexDownAService(int numInstancesToScaleDown, String serviceName) {
+      LOGGER.info("About to flex down {} instances of {}", numInstancesToScaleDown, serviceName);
+
+      int numScaledDown = 0;
+      
+      // Flex down Pending tasks, if any
+      if (numScaledDown < numInstancesToScaleDown) {
+        Set<Protos.TaskID> pendingTasks = Sets.newHashSet(this.schedulerState.getPendingTaskIds(serviceName));
+
+        for (Protos.TaskID taskId : pendingTasks) {
+            this.schedulerState.makeTaskKillable(taskId);
+            numScaledDown++;
+            if (numScaledDown >= numInstancesToScaleDown) {
+                break;
+            }
+        }
+      }
+      int numPendingTasksScaledDown = numScaledDown;
+      
+      // Flex down Staging tasks, if any
+      if (numScaledDown < numInstancesToScaleDown) {
+          Set<Protos.TaskID> stagingTasks = Sets.newHashSet(this.schedulerState.getStagingTaskIds(serviceName));
+
+          for (Protos.TaskID taskId : stagingTasks) {
+              this.schedulerState.makeTaskKillable(taskId);
+              numScaledDown++;
+              if (numScaledDown >= numInstancesToScaleDown) {
+                  break;
+              }
+          }
+      }
+      int numStagingTasksScaledDown = numScaledDown - numPendingTasksScaledDown;
+
+      Set<NodeTask> activeTasks = Sets.newHashSet(this.schedulerState.getActiveTasksByType(serviceName));
+      if (numScaledDown < numInstancesToScaleDown) {
+        for (NodeTask nodeTask : activeTasks) {
+          this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
+          numScaledDown++;
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Marked NodeTask {} on host {} for kill.",
+                nodeTask.getTaskStatus().getTaskId(), nodeTask.getHostname());
+          }
+          if (numScaledDown >= numInstancesToScaleDown) {
+            break;
+          }
+        }
+      }
+      
+      LOGGER.info("Flexed down {} of {} instances including {} staging instances, and {} pending instances of {}",
+              numScaledDown, numInstancesToScaleDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceName);
+    }
+    
+    private int flexDownPendingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
       return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getPendingTaskIDsForProfile(profile),
           profile, constraint, numInstancesToScaleDown) : 0;
     }
 
-  private int flexDownStagingTasks(NMProfile profile, Constraint constraint, int numInstancesToScaleDown) {
+  private int flexDownStagingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
       return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile),
           profile, constraint, numInstancesToScaleDown) : 0;
     }
 
-    private int flexDownActiveTasks(NMProfile profile, Constraint constraint, int numInstancesToScaleDown) {
+    private int flexDownActiveTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) {
       if (numInstancesToScaleDown > 0) {
         List<Protos.TaskID> activeTasksForProfile = Lists.newArrayList(schedulerState.getActiveTaskIDsForProfile(profile));
         nodeScaleDownPolicy.apply(activeTasksForProfile);
@@ -99,7 +205,7 @@ public class MyriadOperations {
       return 0;
     }
 
-  private int flexDownTasks(Collection<Protos.TaskID> taskIDs, NMProfile profile,
+  private int flexDownTasks(Collection<Protos.TaskID> taskIDs, ServiceResourceProfile profile,
                               Constraint constraint, int numInstancesToScaleDown) {
       int numInstancesScaledDown = 0;
       for (Protos.TaskID taskID : taskIDs) {
@@ -135,6 +241,12 @@ public class MyriadOperations {
       }
     }
     return true;
+  } 
+ 
+  public Integer getFlexibleInstances(String taskPrefix) {
+      return this.schedulerState.getActiveTaskIds(taskPrefix).size()
+              + this.schedulerState.getStagingTaskIds(taskPrefix).size()
+              + this.schedulerState.getPendingTaskIds(taskPrefix).size();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
index 1c815aa..aae2098 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
@@ -22,6 +22,7 @@ package com.ebay.myriad.scheduler;
 import java.util.Map;
 import java.util.HashMap;
 
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,10 +101,10 @@ public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
   }
 
   @Override
-  public String generateCommandLine(NMProfile profile, NMPorts ports) {
+  public String generateCommandLine(ServiceResourceProfile profile, Ports ports) {
     StringBuilder cmdLine = new StringBuilder();
 
-    generateEnvironment(profile, ports);
+    generateEnvironment(profile, (NMPorts) ports);
     appendCgroupsCmds(cmdLine);
     appendYarnHomeExport(cmdLine);
     appendEnvForNM(cmdLine);
@@ -111,7 +112,7 @@ public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
     return cmdLine.toString();
   }
 
-  protected void generateEnvironment(NMProfile profile, NMPorts ports) {
+  protected void generateEnvironment(ServiceResourceProfile profile, NMPorts ports) {
     //yarnEnvironemnt configuration from yaml file
     Map<String, String> yarnEnvironmentMap = cfg.getYarnEnvironment();
     if (yarnEnvironmentMap != null) {
@@ -189,4 +190,22 @@ public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
       }
   }
 
+  @Override
+  public String getConfigurationUrl() {
+    YarnConfiguration conf = new YarnConfiguration();
+    String httpPolicy = conf.get(TaskFactory.YARN_HTTP_POLICY);
+    if (httpPolicy != null && httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) {
+      String address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
+      if (address == null || address.isEmpty()) {
+        address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8090";
+      }
+      return "https://" + address + "/conf";
+    } else {
+      String address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
+      if (address == null || address.isEmpty()) {
+        address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8088";
+      }
+      return "http://" + address + "/conf";
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java
index a2c9f46..beb0920 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMPorts.java
@@ -26,7 +26,7 @@ import java.util.Map;
 /**
  * Helper class for dynamically assigning ports to nodemanager
  */
-public class NMPorts {
+public class NMPorts implements Ports {
     private static final String NM_RPC_PORT_KEY = "nm.rpc.port";
     private static final String NM_LOCALIZER_PORT_KEY = "nm.localizer.port";
     private static final String NM_WEBAPP_HTTP_PORT_KEY = "nm.webapp.http.port";

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java
new file mode 100644
index 0000000..659fd7c
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMTaskFactoryAnnotation.java
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad.scheduler;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Target;
+import java.lang.annotation.Retention;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+
+/**
+ * NMTaskFactory annotation that allows to bind TaskFactory to NM specific implementation
+ *
+ */
+@BindingAnnotation @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface NMTaskFactoryAnnotation {}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java
new file mode 100644
index 0000000..4d5064c
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Ports.java
@@ -0,0 +1,9 @@
+package com.ebay.myriad.scheduler;
+
+/**
+ * Generic interface to represent ports
+ *
+ */
+public interface Ports {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
index 518720f..0c9a035 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
@@ -20,6 +20,11 @@ package com.ebay.myriad.scheduler;
 
 import com.ebay.myriad.state.SchedulerState;
 import javax.inject.Inject;
+import java.util.Set;
+
+import com.ebay.myriad.configuration.NodeManagerConfiguration;
+
+import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,12 +37,12 @@ public class Rebalancer implements Runnable {
 
     private final SchedulerState schedulerState;
     private final MyriadOperations myriadOperations;
-    private final NMProfileManager profileManager;
+    private final ServiceProfileManager profileManager;
 
     @Inject
     public Rebalancer(SchedulerState schedulerState,
                       MyriadOperations myriadOperations,
-                      NMProfileManager profileManager) {
+                      ServiceProfileManager profileManager) {
         this.schedulerState = schedulerState;
         this.myriadOperations = myriadOperations;
         this.profileManager = profileManager;
@@ -45,8 +50,10 @@ public class Rebalancer implements Runnable {
 
     @Override
     public void run() {
-        LOGGER.info("Active {}, Pending {}", schedulerState.getActiveTaskIds().size(), schedulerState.getPendingTaskIds().size());
-        if (schedulerState.getActiveTaskIds().size() < 1 && schedulerState.getPendingTaskIds().size() < 1) {
+      final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
+      final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
+        LOGGER.info("Active {}, Pending {}", activeIds.size(), pendingIds.size());
+        if (activeIds.size() < 1 && pendingIds.size() < 1) {
             myriadOperations.flexUpCluster(profileManager.get("small"), 1, null);
         }
 //            RestAdapter restAdapter = new RestAdapter.Builder()

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
index f69460d..392b69b 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
@@ -18,9 +18,11 @@
  */
 package com.ebay.myriad.scheduler;
 
+import com.ebay.myriad.configuration.NodeManagerConfiguration;
 import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
@@ -34,8 +36,8 @@ import java.util.Collection;
 public class SchedulerUtils {
     private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerUtils.class);
 
-    public static boolean isUniqueHostname(Protos.OfferOrBuilder offer,
-                                           Collection<NodeTask> tasks) {
+    public static boolean isUniqueHostname(Protos.OfferOrBuilder offer, NodeTask taskToLaunch, 
+        Collection<NodeTask> tasks) {
         Preconditions.checkArgument(offer != null);
         String offerHostname = offer.getHostname();
 
@@ -44,8 +46,10 @@ public class SchedulerUtils {
         }
         boolean uniqueHostname = true;
         for (NodeTask task : tasks) {
-            if (offerHostname.equalsIgnoreCase(task.getHostname())) {
+            if (offerHostname.equalsIgnoreCase(task.getHostname()) &&
+                task.getTaskPrefix().equalsIgnoreCase(taskToLaunch.getTaskPrefix())) {
                 uniqueHostname = false;
+                break;
             }
         }
         LOGGER.debug("Offer's hostname {} is unique: {}", offerHostname, uniqueHostname);
@@ -61,7 +65,7 @@ public class SchedulerUtils {
      * @return
      */
     public static boolean isEligibleForFineGrainedScaling(String hostName, SchedulerState state) {
-      for (NodeTask activeNMTask : state.getActiveTasks()) {
+      for (NodeTask activeNMTask : state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)) {
         if (activeNMTask.getProfile().getCpus() == 0 &&
             activeNMTask.getProfile().getMemory() == 0 &&
             activeNMTask.getHostname().equals(hostName)) {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceCommandLineGenerator.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceCommandLineGenerator.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceCommandLineGenerator.java
new file mode 100644
index 0000000..7d6c578
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceCommandLineGenerator.java
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package com.ebay.myriad.scheduler;
+
+import com.ebay.myriad.configuration.MyriadConfiguration;
+
+/**
+ * CommandLineGenerator for any aux service launched by Myriad as binary distro
+ *
+ */
+public class ServiceCommandLineGenerator extends DownloadNMExecutorCLGenImpl {
+
+  
+  public ServiceCommandLineGenerator(MyriadConfiguration cfg,
+      String nodeManagerUri) {
+    super(cfg, nodeManagerUri);
+  }
+
+  @Override
+  public String generateCommandLine(ServiceResourceProfile profile, Ports ports) {
+    StringBuilder strB = new StringBuilder();
+    appendDistroExtractionCommands(strB);
+    appendUser(strB);
+    return strB.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceProfileManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceProfileManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceProfileManager.java
new file mode 100644
index 0000000..f877bbb
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceProfileManager.java
@@ -0,0 +1,39 @@
+package com.ebay.myriad.scheduler;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+
+/**
+ * Class to keep all the ServiceResourceProfiles together
+ *
+ */
+public class ServiceProfileManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProfileManager.class);
+
+  private Map<String, ServiceResourceProfile> profiles = new ConcurrentHashMap<>();
+
+  public ServiceResourceProfile get(String name) {
+      return profiles.get(name);
+  }
+
+  public void add(ServiceResourceProfile profile) {
+      LOGGER.info("Adding profile {} with CPU: {} and Memory: {}",
+              profile.getName(), profile.getCpus(), profile.getMemory());
+      profiles.put(profile.getName(), profile);
+  }
+
+  public boolean exists(String name) {
+      return this.profiles.containsKey(name);
+  }
+
+  public String toString() {
+      Gson gson = new Gson();
+      return gson.toJson(this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceResourceProfile.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceResourceProfile.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceResourceProfile.java
new file mode 100644
index 0000000..41358ed
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceResourceProfile.java
@@ -0,0 +1,117 @@
+package com.ebay.myriad.scheduler;
+
+import java.lang.reflect.Type;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+
+/**
+ * Resource Profile for any service 
+ *
+ */
+public class ServiceResourceProfile {
+
+  protected final String name;
+
+  /**
+   * Number of CPU needed to run a service
+   */
+  protected final Double cpus;
+
+  /**
+   * Memory in MB needed to run a service
+   */
+  protected final Double memory;
+
+  protected Double executorCpu = 0.0;
+  
+  protected Double executorMemory = 0.0;
+  
+  protected String className;
+    
+  public ServiceResourceProfile(String name, Double cpu, Double mem) {
+    this.name = name;
+    this.cpus = cpu;
+    this.memory = mem;
+    this.className = ServiceResourceProfile.class.getName();
+  }
+
+
+  public String getName() {
+    return name;
+  }
+
+  public Double getCpus() {
+    return cpus;
+  }
+
+  public Double getMemory() {
+    return memory;
+  }
+  
+  public Double getAggregateMemory() {
+    return memory;
+  }
+  
+  public Double getAggregateCpu() {
+    return cpus;
+  }
+  
+  public Double getExecutorCpu() {
+    return executorCpu;
+  }
+
+  public void setExecutorCpu(Double executorCpu) {
+    this.executorCpu = executorCpu;
+  }
+
+  public Double getExecutorMemory() {
+    return executorMemory;
+  }
+
+  public void setExecutorMemory(Double executorMemory) {
+    this.executorMemory = executorMemory;
+  }
+
+
+  @Override
+  public String toString() {
+      Gson gson = new Gson();
+      return gson.toJson(this);
+  }
+  
+  /**
+   * Custom serializer to help with deserialization of class hierarchy
+   * since at the point of deserialization we don't know class of the data 
+   * that is being deserialized
+   *
+   */
+  public static class CustomDeserializer implements JsonDeserializer<ServiceResourceProfile> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(CustomDeserializer.class);
+    
+    @Override
+    public ServiceResourceProfile deserialize(JsonElement json, Type typeOfT,
+        JsonDeserializationContext context) throws JsonParseException {
+      String type = json.getAsJsonObject().get("className").getAsString();
+      try {
+        @SuppressWarnings("rawtypes")
+        Class c = Class.forName(type);
+        if (ServiceResourceProfile.class.equals(c)) {
+          return new Gson().fromJson(json, typeOfT);
+        }
+        ServiceResourceProfile profile = context.deserialize(json, c);
+        return profile;
+      } catch (ClassNotFoundException e) {
+        LOGGER.error("Classname is not found", e);
+      }
+      return null;
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/46573816/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskConstraints.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskConstraints.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskConstraints.java
new file mode 100644
index 0000000..514dada
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ServiceTaskConstraints.java
@@ -0,0 +1,36 @@
+package com.ebay.myriad.scheduler;
+
+import java.util.Map;
+
+import com.ebay.myriad.configuration.MyriadConfiguration;
+import com.ebay.myriad.configuration.ServiceConfiguration;
+
+/**
+ * ServiceTaskConstraints is an implementation of TaskConstraints for a service
+ * at this point constraints are on ports
+ * Later on there may be other types of constraints added
+ *
+ */
+public class ServiceTaskConstraints implements TaskConstraints {
+
+  private int portsCount;
+  
+  public ServiceTaskConstraints(MyriadConfiguration cfg, String taskPrefix) {
+    this.portsCount = 0;
+    Map<String, ServiceConfiguration> auxConfigs = cfg.getServiceConfigurations();
+    if (auxConfigs == null) {
+      return;
+    }
+    ServiceConfiguration serviceConfig = auxConfigs.get(taskPrefix);
+    if (serviceConfig != null) {
+      if (serviceConfig.getPorts().isPresent()) {
+        this.portsCount = serviceConfig.getPorts().get().size();
+      }
+    }
+  }
+  
+  @Override
+  public int portsCount() {
+    return portsCount;
+  }
+}
\ No newline at end of file