You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@myriad.apache.org by sm...@apache.org on 2015/10/28 17:07:46 UTC

[14/20] incubator-myriad git commit: com.ebay => org.apache

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java
new file mode 100644
index 0000000..2131178
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos.Credential;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.FrameworkInfo.Builder;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.mesos.state.State;
+import org.apache.mesos.state.ZooKeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.scheduler.MyriadDriver;
+import org.apache.myriad.scheduler.MyriadScheduler;
+import org.apache.myriad.state.SchedulerState;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.protobuf.ByteString;
+
+/**
+ * Guice Module for Mesos objects.
+ */
+public class MesosModule extends AbstractModule {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MesosModule.class);
+
+  public MesosModule() {
+  }
+
+  @Override
+  protected void configure() {
+    bind(MyriadDriver.class).in(Scopes.SINGLETON);
+  }
+
+  @Provides
+  @Singleton
+  SchedulerDriver providesSchedulerDriver(MyriadScheduler scheduler, MyriadConfiguration cfg, SchedulerState schedulerState) {
+
+    Builder frameworkInfoBuilder = FrameworkInfo.newBuilder().setUser("").setName(cfg.getFrameworkName()).setCheckpoint(cfg.isCheckpoint()).setFailoverTimeout(cfg.getFrameworkFailoverTimeout());
+
+    if (StringUtils.isNotEmpty(cfg.getFrameworkRole())) {
+      frameworkInfoBuilder.setRole(cfg.getFrameworkRole());
+    }
+
+    FrameworkID frameworkId = schedulerState.getFrameworkID();
+    if (frameworkId != null) {
+      LOGGER.info("Attempting to re-register with frameworkId: {}", frameworkId.getValue());
+      frameworkInfoBuilder.setId(frameworkId);
+    }
+
+    String mesosAuthenticationPrincipal = cfg.getMesosAuthenticationPrincipal();
+    String mesosAuthenticationSecretFilename = cfg.getMesosAuthenticationSecretFilename();
+    if (StringUtils.isNotEmpty(mesosAuthenticationPrincipal)) {
+      frameworkInfoBuilder.setPrincipal(mesosAuthenticationPrincipal);
+
+      Credential.Builder credentialBuilder = Credential.newBuilder();
+      credentialBuilder.setPrincipal(mesosAuthenticationPrincipal);
+      if (StringUtils.isNotEmpty(mesosAuthenticationSecretFilename)) {
+        try {
+          credentialBuilder.setSecret(ByteString.readFrom(new FileInputStream(mesosAuthenticationSecretFilename)));
+        } catch (FileNotFoundException ex) {
+          LOGGER.error("Mesos authentication secret file was not found", ex);
+          throw new RuntimeException(ex);
+        } catch (IOException ex) {
+          LOGGER.error("Error reading Mesos authentication secret file", ex);
+          throw new RuntimeException(ex);
+        }
+      }
+      return new MesosSchedulerDriver(scheduler, frameworkInfoBuilder.build(), cfg.getMesosMaster(), credentialBuilder.build());
+    } else {
+      return new MesosSchedulerDriver(scheduler, frameworkInfoBuilder.build(), cfg.getMesosMaster());
+    }
+  }
+
+  @Provides
+  @Singleton
+  State providesStateStore(MyriadConfiguration cfg) {
+    return new ZooKeeperState(cfg.getZkServers(), cfg.getZkTimeout(), TimeUnit.MILLISECONDS, "/myriad/" + cfg.getFrameworkName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
new file mode 100644
index 0000000..659160e
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad;
+
+import org.apache.myriad.configuration.ServiceConfiguration;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.MyriadExecutorConfiguration;
+import org.apache.myriad.configuration.NodeManagerConfiguration;
+import org.apache.myriad.policy.LeastAMNodesFirstPolicy;
+import org.apache.myriad.policy.NodeScaleDownPolicy;
+import org.apache.myriad.scheduler.MyriadDriverManager;
+import org.apache.myriad.scheduler.fgs.NMHeartBeatHandler;
+import org.apache.myriad.scheduler.fgs.NodeStore;
+import org.apache.myriad.scheduler.fgs.OfferLifecycleManager;
+import org.apache.myriad.scheduler.DownloadNMExecutorCLGenImpl;
+import org.apache.myriad.scheduler.ExecutorCommandLineGenerator;
+import org.apache.myriad.scheduler.NMExecutorCLGenImpl;
+import org.apache.myriad.scheduler.NMTaskFactoryAnnotation;
+import org.apache.myriad.scheduler.ReconcileService;
+import org.apache.myriad.scheduler.ServiceProfileManager;
+import org.apache.myriad.scheduler.ServiceTaskFactoryImpl;
+import org.apache.myriad.scheduler.TaskConstraintsManager;
+import org.apache.myriad.scheduler.TaskFactory;
+import org.apache.myriad.scheduler.TaskFactory.NMTaskFactoryImpl;
+import org.apache.myriad.scheduler.fgs.YarnNodeCapacityManager;
+import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
+import org.apache.myriad.state.MyriadStateStore;
+import org.apache.myriad.state.SchedulerState;
+import org.apache.myriad.webapp.HttpConnectorProvider;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.MapBinder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Guice Module for Myriad
+ */
+public class MyriadModule extends AbstractModule {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MyriadModule.class);
+
+  private MyriadConfiguration cfg;
+  private Configuration hadoopConf;
+  private AbstractYarnScheduler yarnScheduler;
+  private final RMContext rmContext;
+  private InterceptorRegistry interceptorRegistry;
+
+  public MyriadModule(MyriadConfiguration cfg, Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, InterceptorRegistry interceptorRegistry) {
+    this.cfg = cfg;
+    this.hadoopConf = hadoopConf;
+    this.yarnScheduler = yarnScheduler;
+    this.rmContext = rmContext;
+    this.interceptorRegistry = interceptorRegistry;
+  }
+
+  @Override
+  protected void configure() {
+    LOGGER.debug("Configuring guice");
+    bind(MyriadConfiguration.class).toInstance(cfg);
+    bind(Configuration.class).toInstance(hadoopConf);
+    bind(RMContext.class).toInstance(rmContext);
+    bind(AbstractYarnScheduler.class).toInstance(yarnScheduler);
+    bind(InterceptorRegistry.class).toInstance(interceptorRegistry);
+    bind(MyriadDriverManager.class).in(Scopes.SINGLETON);
+    bind(org.apache.myriad.scheduler.MyriadScheduler.class).in(Scopes.SINGLETON);
+    bind(ServiceProfileManager.class).in(Scopes.SINGLETON);
+    bind(DisruptorManager.class).in(Scopes.SINGLETON);
+    bind(ReconcileService.class).in(Scopes.SINGLETON);
+    bind(HttpConnectorProvider.class).in(Scopes.SINGLETON);
+    bind(TaskConstraintsManager.class).in(Scopes.SINGLETON);
+    // add special binding between TaskFactory and NMTaskFactoryImpl to ease up
+    // usage of TaskFactory
+    bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactoryImpl.class);
+    bind(YarnNodeCapacityManager.class).in(Scopes.SINGLETON);
+    bind(NodeStore.class).in(Scopes.SINGLETON);
+    bind(OfferLifecycleManager.class).in(Scopes.SINGLETON);
+    bind(NMHeartBeatHandler.class).asEagerSingleton();
+
+    MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
+    mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON);
+    Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
+    if (auxServicesConfigs != null) {
+      for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
+        String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull();
+        if (taskFactoryClass != null) {
+          try {
+            Class<? extends TaskFactory> implClass = (Class<? extends TaskFactory>) Class.forName(taskFactoryClass);
+            mapBinder.addBinding(entry.getKey()).to(implClass).in(Scopes.SINGLETON);
+          } catch (ClassNotFoundException e) {
+            LOGGER.error("ClassNotFoundException", e);
+          }
+        } else {
+          mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON);
+        }
+      }
+    }
+    //TODO(Santosh): Should be configurable as well
+    bind(NodeScaleDownPolicy.class).to(LeastAMNodesFirstPolicy.class).in(Scopes.SINGLETON);
+  }
+
+  @Provides
+  @Singleton
+  SchedulerState providesSchedulerState(MyriadConfiguration cfg) {
+    LOGGER.debug("Configuring SchedulerState provider");
+    MyriadStateStore myriadStateStore = null;
+    if (cfg.isHAEnabled()) {
+      myriadStateStore = providesMyriadStateStore();
+      if (myriadStateStore == null) {
+        throw new RuntimeException("Could not find a state store" +
+            " implementation for Myriad. The 'yarn.resourcemanager.store.class'" +
+            " property should be set to a class implementing the" +
+            " MyriadStateStore interface. For e.g." +
+            " org.apache.hadoop.yarn.server.resourcemanager.recovery.MyriadFileSystemRMStateStore");
+      }
+    }
+    return new SchedulerState(myriadStateStore);
+  }
+
+  private MyriadStateStore providesMyriadStateStore() {
+    // TODO (sdaingade) Read the implementation class from yml
+    // once multiple implementations are available.
+    if (rmContext.getStateStore() instanceof MyriadStateStore) {
+      return (MyriadStateStore) rmContext.getStateStore();
+    }
+    return null;
+  }
+
+  @Provides
+  @Singleton
+  ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) {
+    ExecutorCommandLineGenerator cliGenerator = null;
+    MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
+    if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+      cliGenerator = new DownloadNMExecutorCLGenImpl(cfg, myriadExecutorConfiguration.getNodeManagerUri().get());
+    } else {
+      cliGenerator = new NMExecutorCLGenImpl(cfg);
+    }
+    return cliGenerator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java
new file mode 100644
index 0000000..e1bee58
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api;
+
+import com.codahale.metrics.annotation.Timed;
+import org.apache.myriad.api.model.FlexDownClusterRequest;
+import org.apache.myriad.api.model.FlexDownServiceRequest;
+import org.apache.myriad.api.model.FlexUpClusterRequest;
+import org.apache.myriad.scheduler.ServiceResourceProfile;
+import org.apache.myriad.scheduler.constraints.ConstraintFactory;
+import org.apache.myriad.state.SchedulerState;
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.myriad.api.model.FlexUpServiceRequest;
+import org.apache.myriad.configuration.MyriadBadConfigurationException;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.scheduler.ServiceProfileManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.Response.Status;
+
+/**
+ * RESTful API to resource manager
+ */
+@Path("/cluster")
+public class ClustersResource {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ClustersResource.class);
+  private static final String LIKE_CONSTRAINT_FORMAT = "'<mesos_slave_attribute|hostname> LIKE <value_regex>'";
+
+  private MyriadConfiguration cfg;
+  private SchedulerState schedulerState;
+  private ServiceProfileManager profileManager;
+  private org.apache.myriad.scheduler.MyriadOperations myriadOperations;
+
+  @Inject
+  public ClustersResource(MyriadConfiguration cfg, SchedulerState state, ServiceProfileManager profileManager, org.apache.myriad.scheduler.MyriadOperations myriadOperations) {
+    this.cfg = cfg;
+    this.schedulerState = state;
+    this.profileManager = profileManager;
+    this.myriadOperations = myriadOperations;
+  }
+
+  @Timed
+  @PUT
+  @Path("/flexup")
+  @Produces(MediaType.TEXT_PLAIN)
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response flexUp(FlexUpClusterRequest request) {
+    Preconditions.checkNotNull(request, "request object cannot be null or empty");
+
+    Integer instances = request.getInstances();
+    String profile = request.getProfile();
+    List<String> constraints = request.getConstraints();
+    LOGGER.info("Received flexup request. Profile: {}, Instances: {}, Constraints: {}", profile, instances, constraints);
+
+    Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED);
+    boolean isValidRequest = validateProfile(profile, response);
+    isValidRequest = isValidRequest && validateInstances(instances, response);
+    isValidRequest = isValidRequest && validateConstraints(constraints, response);
+
+    Response returnResponse = response.build();
+    if (returnResponse.getStatus() == Response.Status.ACCEPTED.getStatusCode()) {
+      String constraint = constraints != null && !constraints.isEmpty() ? constraints.get(0) : null;
+      this.myriadOperations.flexUpCluster(this.profileManager.get(profile), instances, ConstraintFactory.createConstraint(constraint));
+    }
+
+    return returnResponse;
+  }
+
+  @Timed
+  @PUT
+  @Path("/flexupservice")
+  @Produces(MediaType.TEXT_PLAIN)
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response flexUpservice(FlexUpServiceRequest request) {
+    Preconditions.checkNotNull(request, "request object cannot be null or empty");
+
+    LOGGER.info("Received Flexup a Service Request");
+
+    Integer instances = request.getInstances();
+    String serviceName = request.getServiceName();
+
+    LOGGER.info("Instances: {}", instances);
+    LOGGER.info("Service: {}", serviceName);
+
+    // Validate profile request
+    Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED);
+
+    if (cfg.getServiceConfiguration(serviceName) == null) {
+      response.status(Response.Status.BAD_REQUEST).entity("Service does not exist: " + serviceName);
+      LOGGER.error("Provided service does not exist " + serviceName);
+      return response.build();
+    }
+
+    if (!validateInstances(instances, response)) {
+      return response.build();
+    }
+
+    try {
+      this.myriadOperations.flexUpAService(instances, serviceName);
+    } catch (MyriadBadConfigurationException e) {
+      return response.status(Response.Status.BAD_REQUEST).entity(e.getMessage()).build();
+    }
+
+    return response.build();
+  }
+
+  @Timed
+  @PUT
+  @Path("/flexdown")
+  @Produces(MediaType.TEXT_PLAIN)
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response flexDown(FlexDownClusterRequest request) {
+    Preconditions.checkNotNull(request, "request object cannot be null or empty");
+
+    Integer instances = request.getInstances();
+    String profile = request.getProfile();
+    List<String> constraints = request.getConstraints();
+    LOGGER.info("Received flex down request. Profile: {}, Instances: {}, Constraints: {}", profile, instances, constraints);
+
+    Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED);
+    boolean isValidRequest = validateProfile(profile, response);
+    isValidRequest = isValidRequest && validateInstances(instances, response);
+    isValidRequest = isValidRequest && validateConstraints(constraints, response);
+
+    if (isValidRequest) {
+      Integer numFlexedUp = this.getNumFlexedupNMs(profile);
+      if (numFlexedUp < instances) {
+        String message = String.format("Number of requested instances for flexdown is greater than the number of " +
+            "Node Managers previously flexed up for profile '%s'. Requested: %d, Previously flexed Up: %d. " +
+            "Only %d Node Managers will be flexed down.", profile, instances, numFlexedUp, numFlexedUp);
+        response.entity(message);
+        LOGGER.warn(message);
+      }
+    }
+
+    Response returnResponse = response.build();
+    if (returnResponse.getStatus() == Response.Status.ACCEPTED.getStatusCode()) {
+      String constraint = constraints != null && !constraints.isEmpty() ? constraints.get(0) : null;
+      this.myriadOperations.flexDownCluster(profileManager.get(profile), ConstraintFactory.createConstraint(constraint), instances);
+    }
+    return returnResponse;
+  }
+
+  private boolean validateProfile(String profile, ResponseBuilder response) {
+    if (profile == null || profile.isEmpty()) {
+      response.status(Response.Status.BAD_REQUEST).entity("'profile' is null or empty");
+      LOGGER.error("'profile' is null or empty");
+      return false;
+    }
+    if (!this.profileManager.exists(profile)) {
+      response.status(Response.Status.BAD_REQUEST).entity("Profile does not exist: '" + profile + "'");
+      LOGGER.error("Provided profile does not exist: '" + profile + "'");
+      return false;
+    }
+    return true;
+  }
+
+  private boolean validateInstances(Integer instances, ResponseBuilder response) {
+    if (instances == null) {
+      response.status(Response.Status.BAD_REQUEST).entity("'instances' is null");
+      LOGGER.error("'instances' is null");
+      return false;
+    } else if (!(instances > 0)) {
+      response.status(Response.Status.BAD_REQUEST).entity("Invalid instance size: " + instances);
+      LOGGER.error("Invalid instance size request " + instances);
+      return false;
+    }
+    return true;
+  }
+
+  private boolean validateConstraints(List<String> constraints, ResponseBuilder response) {
+    if (constraints != null && !constraints.isEmpty()) {
+      boolean valid = validateConstraintsSize(constraints, response);
+      valid = valid && validateLIKEConstraint(constraints.get(0), response);
+      return valid;
+    }
+    return true;
+  }
+
+  private boolean validateConstraintsSize(List<String> constraints, ResponseBuilder response) {
+    if (constraints.size() > 1) {
+      String message = String.format("Only 1 constraint is currently supported. Received: %s", constraints.toString());
+      response.status(Status.BAD_REQUEST).entity(message);
+      LOGGER.error(message);
+      return false;
+    }
+    return true;
+  }
+
+  private boolean validateLIKEConstraint(String constraint, ResponseBuilder response) {
+    if (constraint.isEmpty()) {
+      String message = String.format("The value provided for 'constraints' is empty. Format: %s", LIKE_CONSTRAINT_FORMAT);
+      response.status(Status.BAD_REQUEST).entity(message);
+      LOGGER.error(message);
+      return false;
+    }
+
+    String[] splits = constraint.split(" LIKE "); // "<key> LIKE <val_regex>"
+    if (splits.length != 2) {
+      String message = String.format("Invalid format for LIKE operator in constraint: %s. Format: %s", constraint, LIKE_CONSTRAINT_FORMAT);
+      response.status(Status.BAD_REQUEST).entity(message);
+      LOGGER.error(message);
+      return false;
+    }
+    try {
+      Pattern.compile(splits[1]);
+    } catch (PatternSyntaxException e) {
+      String message = String.format("Invalid regex for LIKE operator in constraint: %s", constraint);
+      response.status(Status.BAD_REQUEST).entity(message);
+      LOGGER.error(message, e);
+      return false;
+    }
+    return true;
+  }
+
+
+  private Integer getNumFlexedupNMs(String profile) {
+    ServiceResourceProfile serviceProfile = profileManager.get(profile);
+    return this.schedulerState.getActiveTaskIDsForProfile(serviceProfile).size() + this.schedulerState.getStagingTaskIDsForProfile(serviceProfile).size() + this.schedulerState.getPendingTaskIDsForProfile(serviceProfile).size();
+  }
+
+  @Timed
+  @PUT
+  @Path("/flexdownservice")
+  @Produces(MediaType.TEXT_PLAIN)
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response flexDownservice(FlexDownServiceRequest request) {
+    Preconditions.checkNotNull(request, "request object cannot be null or empty");
+
+    Integer instances = request.getInstances();
+    String serviceName = request.getServiceName();
+
+    LOGGER.info("Received flexdown request for service {}", serviceName);
+    LOGGER.info("Instances: " + instances);
+
+    Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED);
+
+    if (!validateInstances(instances, response)) {
+      return response.build();
+    }
+
+    // warn that number of requested instances isn't available
+    // but instances will still be flexed down
+    Integer flexibleInstances = this.myriadOperations.getFlexibleInstances(serviceName);
+    if (flexibleInstances < instances) {
+      response.entity("Number of requested instances is greater than available.");
+      // just doing a simple check here. pass the requested number of instances
+      // to myriadOperations and let it sort out how many actually get flexxed down.
+      LOGGER.warn("Requested number of instances greater than available: {} < {}", flexibleInstances, instances);
+    }
+
+    this.myriadOperations.flexDownAService(instances, serviceName);
+    return response.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java
new file mode 100644
index 0000000..336b22f
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api;
+
+import com.codahale.metrics.annotation.Timed;
+import org.apache.myriad.configuration.MyriadConfiguration;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+/**
+ * Defines the REST API to the Myriad configuration.
+ */
+@Path("/config")
+@Produces(MediaType.APPLICATION_JSON)
+public class ConfigurationResource {
+  private MyriadConfiguration cfg;
+
+  @Inject
+  public ConfigurationResource(MyriadConfiguration cfg) {
+    this.cfg = cfg;
+  }
+
+  @Timed
+  @GET
+  public MyriadConfiguration getConfig() throws InterruptedException {
+    return this.cfg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
new file mode 100644
index 0000000..a3bee85
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api;
+
+import com.codahale.metrics.annotation.Timed;
+import org.apache.myriad.api.model.GetSchedulerStateResponse;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.mesos.Protos;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Defines the REST API for the current state of Myriad
+ */
+@Path("/state")
+@Produces(MediaType.APPLICATION_JSON)
+public class SchedulerStateResource {
+  private MyriadConfiguration cfg;
+  private org.apache.myriad.state.SchedulerState state;
+
+  @Inject
+  public SchedulerStateResource(final MyriadConfiguration cfg, final org.apache.myriad.state.SchedulerState state) {
+    this.cfg = cfg;
+    this.state = state;
+  }
+
+  @Timed
+  @GET
+  public GetSchedulerStateResponse getState() {
+    return new GetSchedulerStateResponse(toStringCollection(state.getPendingTaskIds()), toStringCollection(state.getStagingTaskIds()), toStringCollection(state.getActiveTaskIds()), toStringCollection(state.getKillableTasks()));
+  }
+
+  private Collection<String> toStringCollection(Collection<Protos.TaskID> collection) {
+    if (CollectionUtils.isEmpty(collection)) {
+      return Collections.emptyList();
+    }
+    Collection<String> returnCollection = new ArrayList<>();
+    for (Protos.TaskID task : collection) {
+      returnCollection.add(task.getValue());
+    }
+
+    return returnCollection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownClusterRequest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownClusterRequest.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownClusterRequest.java
new file mode 100644
index 0000000..4dd84a3
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownClusterRequest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api.model;
+
+import com.google.gson.Gson;
+import java.util.List;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Flex down request parameters
+ */
+public class FlexDownClusterRequest {
+
+  @NotEmpty
+  public String profile;
+
+  @NotEmpty
+  public Integer instances;
+
+  public List<String> constraints;
+
+  public FlexDownClusterRequest() {
+  }
+
+  public FlexDownClusterRequest(String profile, Integer instances, List<String> constraints) {
+    this.instances = instances;
+    this.profile = profile;
+    this.constraints = constraints;
+  }
+
+  public Integer getInstances() {
+    return instances;
+  }
+
+  public void setInstances(Integer instances) {
+    this.instances = instances;
+  }
+
+  public String getProfile() {
+    return profile;
+  }
+
+  public void setProfile(String profile) {
+    this.profile = profile;
+  }
+
+  public List<String> getConstraints() {
+    return constraints;
+  }
+
+  public void setConstraints(List<String> constraints) {
+    this.constraints = constraints;
+  }
+
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownServiceRequest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownServiceRequest.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownServiceRequest.java
new file mode 100644
index 0000000..bbfcb58
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownServiceRequest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.myriad.api.model;
+
+import com.google.gson.Gson;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Flex down an auxtask/service
+ */
+public class FlexDownServiceRequest {
+
+  @NotEmpty
+  public Integer instances;
+
+  @NotEmpty
+  public String serviceName;
+
+  public FlexDownServiceRequest() {
+  }
+
+  public FlexDownServiceRequest(Integer instances, String serviceName) {
+    this.instances = instances;
+    this.serviceName = serviceName;
+  }
+
+  public Integer getInstances() {
+    return instances;
+  }
+
+  public void setInstances(Integer instances) {
+    this.instances = instances;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpClusterRequest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpClusterRequest.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpClusterRequest.java
new file mode 100644
index 0000000..0ab9402
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpClusterRequest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api.model;
+
+import com.google.gson.Gson;
+import java.util.List;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Flex up an auxtask/service
+ */
+public class FlexUpClusterRequest {
+  @NotEmpty
+  public Integer instances;
+
+  @NotEmpty
+  public String profile;
+
+  public List<String> constraints;
+
+  public FlexUpClusterRequest() {
+  }
+
+  public FlexUpClusterRequest(Integer instances, String profile, List<String> constraints) {
+    this.instances = instances;
+    this.profile = profile;
+    this.constraints = constraints;
+  }
+
+  public Integer getInstances() {
+    return instances;
+  }
+
+  public void setInstances(Integer instances) {
+    this.instances = instances;
+  }
+
+  public String getProfile() {
+    return profile;
+  }
+
+  public void setProfile(String profile) {
+    this.profile = profile;
+  }
+
+  public List<String> getConstraints() {
+    return constraints;
+  }
+
+  public void setConstraints(List<String> constraints) {
+    this.constraints = constraints;
+  }
+
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpServiceRequest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpServiceRequest.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpServiceRequest.java
new file mode 100644
index 0000000..b047894
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpServiceRequest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.myriad.api.model;
+
+import com.google.gson.Gson;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Flex up request parameters
+ */
+public class FlexUpServiceRequest {
+  @NotEmpty
+  public Integer instances;
+
+  @NotEmpty
+  public String serviceName;
+
+  public FlexUpServiceRequest() {
+  }
+
+  public FlexUpServiceRequest(Integer instances, String profile) {
+    this.instances = instances;
+    this.serviceName = profile;
+  }
+
+  public Integer getInstances() {
+    return instances;
+  }
+
+  public void setInstances(Integer instances) {
+    this.instances = instances;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String profile) {
+    this.serviceName = profile;
+  }
+
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java
new file mode 100644
index 0000000..9ae5cc7
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api.model;
+
+import java.util.Collection;
+
+/**
+ * Response for the current state of Myriad
+ */
+public class GetSchedulerStateResponse {
+  private Collection<String> pendingTasks;
+  private Collection<String> stagingTasks;
+  private Collection<String> activeTasks;
+  private Collection<String> killableTasks;
+
+  public GetSchedulerStateResponse() {
+
+  }
+
+  public GetSchedulerStateResponse(Collection<String> pendingTasks, Collection<String> stagingTasks, Collection<String> activeTasks, Collection<String> killableTasks) {
+    this.pendingTasks = pendingTasks;
+    this.stagingTasks = stagingTasks;
+    this.activeTasks = activeTasks;
+    this.killableTasks = killableTasks;
+  }
+
+  public Collection<String> getPendingTasks() {
+    return pendingTasks;
+  }
+
+  public Collection<String> getStagingTasks() {
+    return stagingTasks;
+  }
+
+  public Collection<String> getActiveTasks() {
+    return activeTasks;
+  }
+
+  public Collection<String> getKillableTasks() {
+    return killableTasks;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadBadConfigurationException.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadBadConfigurationException.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadBadConfigurationException.java
new file mode 100644
index 0000000..6a23323
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadBadConfigurationException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.myriad.configuration;
+
+/**
+ * Myriad specific exception
+ */
+@SuppressWarnings("serial")
+public class MyriadBadConfigurationException extends Exception {
+
+
+  public MyriadBadConfigurationException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
new file mode 100644
index 0000000..efb185f
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.configuration;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+
+import org.hibernate.validator.constraints.NotEmpty;
+
+import java.util.Map;
+
+/**
+ * Myriad Configuration commonly defined in the YML file
+ * mesosMaster: 10.0.2.15:5050
+ * checkpoint: false
+ * frameworkFailoverTimeout: 43200000
+ * frameworkName: MyriadAlpha
+ * nativeLibrary: /usr/local/lib/libmesos.so
+ * zkServers: localhost:2181
+ * zkTimeout: 20000
+ * profiles:
+ * small:
+ * cpu: 1
+ * mem: 1100
+ * medium:
+ * cpu: 2
+ * mem: 2048
+ * large:
+ * cpu: 4
+ * mem: 4096
+ * rebalancer: false
+ * nodemanager:
+ * jvmMaxMemoryMB: 1024
+ * user: hduser
+ * cpus: 0.2
+ * cgroups: false
+ * executor:
+ * jvmMaxMemoryMB: 256
+ * path: file://localhost/usr/local/libexec/mesos/myriad-executor-runnable-0.0.1.jar
+ * yarnEnvironment:
+ * YARN_HOME: /usr/local/hadoop
+ */
+public class MyriadConfiguration {
+  /**
+   * By default framework checkpointing is turned off.
+   */
+  public static final Boolean DEFAULT_CHECKPOINT = false;
+
+  /**
+   * By default rebalancer is turned off.
+   */
+  public static final Boolean DEFAULT_REBALANCER = true;
+
+  /**
+   * By default ha is turned off.
+   */
+  public static final Boolean DEFAULT_HA_ENABLED = false;
+
+  /**
+   * By default framework failover timeout is 1 day.
+   */
+  public static final Double DEFAULT_FRAMEWORK_FAILOVER_TIMEOUT_MS = 86400000.0;
+
+  public static final String DEFAULT_FRAMEWORK_NAME = "myriad-scheduler";
+
+  public static final String DEFAULT_NATIVE_LIBRARY = "/usr/local/lib/libmesos.so";
+
+  public static final Integer DEFAULT_ZK_TIMEOUT = 20000;
+
+  public static final Integer DEFAULT_REST_API_PORT = 8192;
+
+  @JsonProperty
+  @NotEmpty
+  private String mesosMaster;
+
+  @JsonProperty
+  private Boolean checkpoint;
+
+  @JsonProperty
+  private Double frameworkFailoverTimeout;
+
+  @JsonProperty
+  private String frameworkName;
+
+  @JsonProperty
+  private String frameworkRole;
+
+  @JsonProperty
+  @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class)
+  private String frameworkUser;
+
+  @JsonProperty
+  @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class)
+  private String frameworkSuperUser;
+
+  @JsonProperty
+  @NotEmpty
+  private Map<String, Map<String, String>> profiles;
+
+  @JsonProperty
+  @NotEmpty
+  private Map<String, Integer> nmInstances;
+
+  @JsonProperty
+  private Boolean rebalancer;
+
+  @JsonProperty
+  private Boolean haEnabled;
+
+  @JsonProperty
+  private NodeManagerConfiguration nodemanager;
+
+  @JsonProperty
+  private Map<String, ServiceConfiguration> services;
+
+  @JsonProperty
+  @NotEmpty
+  private MyriadExecutorConfiguration executor;
+
+  @JsonProperty
+  private String nativeLibrary;
+
+  @JsonProperty
+  @NotEmpty
+  private String zkServers;
+
+  @JsonProperty
+  private Integer zkTimeout;
+
+  @JsonProperty
+  private Integer restApiPort;
+
+  @JsonProperty
+  @NotEmpty
+  private Map<String, String> yarnEnvironment;
+
+  @JsonProperty
+  private String mesosAuthenticationPrincipal;
+
+  @JsonProperty
+  private String mesosAuthenticationSecretFilename;
+
+
+  public MyriadConfiguration() {
+  }
+
+
+  public String getMesosMaster() {
+    return mesosMaster;
+  }
+
+  public Boolean isCheckpoint() {
+    return this.checkpoint != null ? checkpoint : DEFAULT_CHECKPOINT;
+  }
+
+  public Double getFrameworkFailoverTimeout() {
+    return this.frameworkFailoverTimeout != null ? this.frameworkFailoverTimeout : DEFAULT_FRAMEWORK_FAILOVER_TIMEOUT_MS;
+  }
+
+  public String getFrameworkName() {
+    return Strings.isNullOrEmpty(this.frameworkName) ? DEFAULT_FRAMEWORK_NAME : this.frameworkName;
+  }
+
+  public String getFrameworkRole() {
+    return frameworkRole;
+  }
+
+  public Optional<String> getFrameworkUser() {
+    return Optional.fromNullable(frameworkUser);
+  }
+
+  public Optional<String> getFrameworkSuperUser() {
+    return Optional.fromNullable(frameworkSuperUser);
+  }
+
+  public Map<String, Map<String, String>> getProfiles() {
+    return profiles;
+  }
+
+  public Map<String, Integer> getNmInstances() {
+    return nmInstances;
+  }
+
+  public Boolean isRebalancer() {
+    return rebalancer != null ? rebalancer : DEFAULT_REBALANCER;
+  }
+
+  public Boolean isHAEnabled() {
+    return haEnabled != null ? haEnabled : DEFAULT_HA_ENABLED;
+  }
+
+  public NodeManagerConfiguration getNodeManagerConfiguration() {
+    return this.nodemanager;
+  }
+
+  public Map<String, ServiceConfiguration> getServiceConfigurations() {
+    return this.services;
+  }
+
+  public ServiceConfiguration getServiceConfiguration(String taskName) {
+    if (services == null) {
+      return null;
+    }
+    return this.services.get(taskName);
+  }
+
+  public MyriadExecutorConfiguration getMyriadExecutorConfiguration() {
+    return this.executor;
+  }
+
+  public String getNativeLibrary() {
+    return Strings.isNullOrEmpty(this.nativeLibrary) ? DEFAULT_NATIVE_LIBRARY : this.nativeLibrary;
+  }
+
+  public String getZkServers() {
+    return this.zkServers;
+  }
+
+  public Integer getZkTimeout() {
+    return this.zkTimeout != null ? this.zkTimeout : DEFAULT_ZK_TIMEOUT;
+  }
+
+  public Integer getRestApiPort() {
+    return this.restApiPort != null ? this.restApiPort : DEFAULT_REST_API_PORT;
+  }
+
+  public Map<String, String> getYarnEnvironment() {
+    return yarnEnvironment;
+  }
+
+  public String getMesosAuthenticationSecretFilename() {
+    return mesosAuthenticationSecretFilename;
+  }
+
+  public String getMesosAuthenticationPrincipal() {
+    return mesosAuthenticationPrincipal;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java
new file mode 100644
index 0000000..d68b6fb
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.configuration;
+
+import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble;
+import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Configuration for the Executor
+ */
+public class MyriadExecutorConfiguration {
+  /**
+   * Translates to -Xmx for the NodeManager JVM.
+   */
+  @JsonProperty
+  @JsonSerialize(using = OptionalSerializerDouble.class)
+  private Double jvmMaxMemoryMB;
+
+  @JsonProperty
+  @NotEmpty
+  private String path;
+
+  @JsonProperty
+  @JsonSerialize(using = OptionalSerializerString.class)
+  private String nodeManagerUri;
+
+  public Optional<Double> getJvmMaxMemoryMB() {
+    return Optional.fromNullable(jvmMaxMemoryMB);
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public Optional<String> getNodeManagerUri() {
+    return Optional.fromNullable(nodeManagerUri);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
new file mode 100644
index 0000000..1c6c096
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.configuration;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerBoolean;
+import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble;
+import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+
+/**
+ * Node Manager Configuration
+ */
+public class NodeManagerConfiguration {
+  /**
+   * Allot 10% more memory to account for JVM overhead.
+   */
+  public static final double JVM_OVERHEAD = 0.1;
+
+  /**
+   * Default -Xmx for NodeManager JVM.
+   */
+  public static final double DEFAULT_JVM_MAX_MEMORY_MB = 2048;
+
+  /**
+   * Default cpu for NodeManager JVM.
+   */
+  public static final double DEFAULT_NM_CPUS = 1;
+
+  public static final String NM_TASK_PREFIX = "nm";
+
+  /**
+   * Translates to -Xmx for the NodeManager JVM.
+   */
+  @JsonProperty
+  @JsonSerialize(using = OptionalSerializerDouble.class)
+  private Double jvmMaxMemoryMB;
+
+  /**
+   * Amount of CPU share given to NodeManger JVM. This is critical specially
+   * for NodeManager auxiliary services.
+   */
+  @JsonProperty
+  @JsonSerialize(using = OptionalSerializerDouble.class)
+  private Double cpus;
+
+  /**
+   * Translates to JAVA_OPTS for the NodeManager JVM.
+   */
+  @JsonProperty
+  @JsonSerialize(using = OptionalSerializerString.class)
+  private String jvmOpts;
+
+  /**
+   * Determines if cgroups are enabled for NM or not.
+   */
+  @JsonProperty
+  @JsonSerialize(using = OptionalSerializerBoolean.class)
+  private Boolean cgroups;
+
+  public Optional<Double> getJvmMaxMemoryMB() {
+    return Optional.fromNullable(jvmMaxMemoryMB);
+  }
+
+  public Optional<String> getJvmOpts() {
+    return Optional.fromNullable(jvmOpts);
+  }
+
+  public Optional<Double> getCpus() {
+    return Optional.fromNullable(cpus);
+  }
+
+  public Optional<Boolean> getCgroups() {
+    return Optional.fromNullable(cgroups);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java
new file mode 100644
index 0000000..e0cba43
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.configuration;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializerProvider;
+
+import com.google.common.base.Optional;
+
+/**
+ * Custom Serializer that allows to serialize Optional
+ * today Optional does not serialize value, but just state: "present: true/false"
+ * This class will serialize <T> value instead of state
+ * This is needed for REST APIs and Myriad UI
+ *
+ * @param <T>
+ */
+public class OptionalSerializer<T> extends JsonSerializer<Optional<T>> {
+
+  private static final JsonFactory jsonFactory = new ObjectMapper().getJsonFactory();
+
+  protected ObjectMapper objMapper;
+
+  public OptionalSerializer() {
+    objMapper = new ObjectMapper(jsonFactory);
+  }
+
+  @Override
+  public void serialize(Optional<T> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+    if (value.isPresent()) {
+      objMapper.writeValue(jgen, value.get());
+    } else {
+      objMapper.writeValue(jgen, "value is absent");
+    }
+  }
+
+  /**
+   * Custom String serializer
+   */
+  public static class OptionalSerializerString extends OptionalSerializer<String> {
+    @Override
+    public void serialize(Optional<String> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+      super.serialize(value, jgen, provider);
+    }
+  }
+
+  /**
+   * Custom Double serializer
+   */
+  public static class OptionalSerializerDouble extends OptionalSerializer<Double> {
+    @Override
+    public void serialize(Optional<Double> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+      super.serialize(value, jgen, provider);
+    }
+  }
+
+  /**
+   * Custom Integer serializer
+   */
+  public static class OptionalSerializerInt extends OptionalSerializer<Integer> {
+    @Override
+    public void serialize(Optional<Integer> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+      super.serialize(value, jgen, provider);
+    }
+  }
+
+  /**
+   * Custom Boolean serializer
+   */
+  public static class OptionalSerializerBoolean extends OptionalSerializer<Boolean> {
+    @Override
+    public void serialize(Optional<Boolean> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+      super.serialize(value, jgen, provider);
+    }
+  }
+
+  /**
+   * Custom Map serializer
+   */
+  public static class OptionalSerializerMap extends OptionalSerializer<Map<?, ?>> {
+    @Override
+    public void serialize(Optional<Map<?, ?>> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+      super.serialize(value, jgen, provider);
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
new file mode 100644
index 0000000..f0b913c
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.myriad.configuration;
+
+import java.util.Map;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.hibernate.validator.constraints.NotEmpty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+
+/**
+ * Configuration for any service/task to be started from Myriad Scheduler
+ */
+public class ServiceConfiguration {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConfiguration.class);
+
+  public static final Double DEFAULT_CPU = 0.1;
+
+  public static final Double DEFAULT_MEMORY = 256.0;
+
+  /**
+   * Translates to -Xmx for the JVM.
+   */
+  @JsonProperty
+  @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble.class)
+  protected Double jvmMaxMemoryMB;
+
+  /**
+   * Amount of CPU share given to  JVM.
+   */
+  @JsonProperty
+  @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble.class)
+  protected Double cpus;
+
+  /**
+   * Translates to jvm opts for the JVM.
+   */
+  @JsonProperty
+  @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class)
+  protected String jvmOpts;
+
+  @JsonProperty
+  @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerMap.class)
+  protected Map<String, Long> ports;
+
+  /**
+   * If we will have some services
+   * that are not easy to express just by properties
+   * we can use this one to have a specific implementation
+   */
+  @JsonProperty
+  @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class)
+  protected String taskFactoryImplName;
+
+  @JsonProperty
+  protected String envSettings;
+
+  @JsonProperty
+  @NotEmpty
+  protected String taskName;
+
+  @JsonProperty
+  @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerInt.class)
+  protected Integer maxInstances;
+
+  @JsonProperty
+  @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class)
+  protected String command;
+
+  @JsonProperty
+  protected String serviceOptsName;
+
+
+  public Optional<Double> getJvmMaxMemoryMB() {
+    return Optional.fromNullable(jvmMaxMemoryMB);
+  }
+
+  public Optional<String> getJvmOpts() {
+    return Optional.fromNullable(jvmOpts);
+  }
+
+  public Optional<Double> getCpus() {
+    return Optional.fromNullable(cpus);
+  }
+
+  public String getTaskName() {
+    return taskName;
+  }
+
+  public void setTaskName(String taskName) {
+    this.taskName = taskName;
+  }
+
+  public Optional<String> getTaskFactoryImplName() {
+    return Optional.fromNullable(taskFactoryImplName);
+  }
+
+  public String getEnvSettings() {
+    return envSettings;
+  }
+
+  public Optional<Map<String, Long>> getPorts() {
+    return Optional.fromNullable(ports);
+  }
+
+  public Optional<Integer> getMaxInstances() {
+    return Optional.fromNullable(maxInstances);
+  }
+
+  public Optional<String> getCommand() {
+    return Optional.fromNullable(command);
+  }
+
+  public String getServiceOpts() {
+    return serviceOptsName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java
new file mode 100644
index 0000000..e1b6a17
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.health;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Health Check Utilities
+ */
+public class HealthCheckUtils {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HealthCheckUtils.class);
+
+  public static boolean checkHostPort(String connectionString) {
+    String[] split = connectionString.split(":");
+    String serverAddress = split[0];
+    Integer serverPort = Integer.valueOf(split[1]);
+    try (Socket s = new Socket(serverAddress, serverPort)) {
+      return true;
+    } catch (IOException ex) {
+      LOGGER.error("parsing host port", ex);
+    }
+
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java
new file mode 100644
index 0000000..96dcfe2
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.health;
+
+import com.codahale.metrics.health.HealthCheck;
+import org.apache.mesos.Protos.Status;
+
+import javax.inject.Inject;
+
+/**
+ * Health Check that Mesos Master is running
+ */
+public class MesosDriverHealthCheck extends HealthCheck {
+
+  public static final String NAME = "mesos-driver";
+  private org.apache.myriad.scheduler.MyriadDriverManager driverManager;
+
+  @Inject
+  public MesosDriverHealthCheck(org.apache.myriad.scheduler.MyriadDriverManager driverManager) {
+    this.driverManager = driverManager;
+  }
+
+  @Override
+  protected Result check() throws Exception {
+    Status driverStatus = driverManager.getDriverStatus();
+    if (Status.DRIVER_RUNNING == driverStatus) {
+      return Result.healthy();
+    } else {
+      return Result.unhealthy("Driver status: " + driverStatus);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java
new file mode 100644
index 0000000..a394c30
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.health;
+
+import com.codahale.metrics.health.HealthCheck;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Health check for Mesos master
+ */
+public class MesosMasterHealthCheck extends HealthCheck {
+  public static final String NAME = "mesos-master";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MesosMasterHealthCheck.class);
+
+  private MyriadConfiguration cfg;
+
+  @Inject
+  public MesosMasterHealthCheck(MyriadConfiguration cfg) {
+    this.cfg = cfg;
+  }
+
+  @Override
+  protected Result check() throws Exception {
+    String mesosMaster = cfg.getMesosMaster();
+    int zkIndex = mesosMaster.indexOf("zk://", 0);
+    Result result = Result.unhealthy("Unable to connect to: " + mesosMaster);
+    if (zkIndex >= 0) {
+      final int fromIndex = 5;
+      String zkHostPorts = mesosMaster.substring(fromIndex, mesosMaster.indexOf("/", fromIndex));
+
+      String[] hostPorts = zkHostPorts.split(",");
+
+      for (String hostPort : hostPorts) {
+        final int maxRetries = 3;
+        final int baseSleepTimeMs = 1000;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(hostPort, new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries));
+        client.start();
+        final int blockTime = 5;
+        client.blockUntilConnected(blockTime, TimeUnit.SECONDS);
+
+        switch (client.getState()) {
+          case STARTED:
+            result = Result.healthy();
+            break;
+          case STOPPED:
+            LOGGER.info("Unable to reach: ", hostPort);
+            break;
+          case LATENT:
+            LOGGER.info("Unable to reach: ", hostPort);
+            break;
+          default:
+            LOGGER.info("Unable to reach: ", hostPort);
+        }
+      }
+    } else {
+      if (HealthCheckUtils.checkHostPort(mesosMaster)) {
+        result = Result.healthy();
+      }
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java
new file mode 100644
index 0000000..46b86a1
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.health;
+
+import com.codahale.metrics.health.HealthCheck;
+import org.apache.myriad.configuration.MyriadConfiguration;
+
+import javax.inject.Inject;
+
+/**
+ * Health Check on ZK
+ */
+public class ZookeeperHealthCheck extends HealthCheck {
+  public static final String NAME = "zookeeper";
+  private MyriadConfiguration cfg;
+
+  @Inject
+  public ZookeeperHealthCheck(MyriadConfiguration cfg) {
+    this.cfg = cfg;
+  }
+
+  @Override
+  protected Result check() throws Exception {
+    // todo:  (kensipe) this needs to be implemented
+    return Result.healthy();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java b/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java
new file mode 100644
index 0000000..916bdf0
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.policy;
+
+import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A scale down policy that maintains returns a list of nodes running least number of AMs.
+ */
+public class LeastAMNodesFirstPolicy extends BaseInterceptor implements NodeScaleDownPolicy {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeastAMNodesFirstPolicy.class);
+
+  private final AbstractYarnScheduler yarnScheduler;
+  private final org.apache.myriad.state.SchedulerState schedulerState;
+
+  //TODO(Santosh): Should figure out the right values for the hashmap properties.
+  // currently it's tuned for 200 nodes and 50 RM RPC threads (Yarn's default).
+  private static final int INITIAL_NODE_SIZE = 200;
+  private static final int EXPECTED_CONCURRENT_ACCCESS_COUNT = 50;
+  private static final float LOAD_FACTOR_DEFAULT = 0.75f;
+
+  private Map<String, SchedulerNode> schedulerNodes = new ConcurrentHashMap<>(INITIAL_NODE_SIZE, LOAD_FACTOR_DEFAULT, EXPECTED_CONCURRENT_ACCCESS_COUNT);
+
+  @Inject
+  public LeastAMNodesFirstPolicy(org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, org.apache.myriad.state.SchedulerState schedulerState) {
+    registry.register(this);
+    this.yarnScheduler = yarnScheduler;
+    this.schedulerState = schedulerState;
+  }
+
+  /**
+   * Sort the given list of tasks by the number of App Master containers running on the corresponding NM node.
+   *
+   * @param taskIDs
+   */
+  @Override
+  public void apply(List<Protos.TaskID> taskIDs) {
+    if (LOGGER.isDebugEnabled()) {
+      for (SchedulerNode node : schedulerNodes.values()) {
+        LOGGER.debug("Host {} is running {} containers including {} App Masters", node.getNodeID().getHost(), node.getRunningContainers().size(), getNumAMContainers(node.getRunningContainers()));
+      }
+    }
+    // We need to lock the YARN scheduler here. If we don't do that, then the YARN scheduler can
+    // process HBs from NodeManagers and the state of SchedulerNode objects might change while we
+    // are in the middle of sorting them based on the least number of AM containers.
+    synchronized (yarnScheduler) {
+      Collections.sort(taskIDs, new Comparator<Protos.TaskID>() {
+        @Override
+        public int compare(Protos.TaskID t1, Protos.TaskID t2) {
+          SchedulerNode o1 = schedulerNodes.get(schedulerState.getTask(t1).getHostname());
+          SchedulerNode o2 = schedulerNodes.get(schedulerState.getTask(t2).getHostname());
+
+          if (o1 == null) { // a NM was launched by Myriad, but it hasn't yet registered with RM
+            if (o2 == null) {
+              return 0;
+            } else {
+              return -1;
+            }
+          } else if (o2 == null) {
+            return 1;
+          } // else, both the NMs have registered with RM
+
+          List<RMContainer> runningContainers1 = o1.getRunningContainers();
+          List<RMContainer> runningContainers2 = o2.getRunningContainers();
+
+          Integer numRunningAMs1 = getNumAMContainers(runningContainers1);
+          Integer numRunningAMs2 = getNumAMContainers(runningContainers2);
+
+          Integer numRunningContainers1 = runningContainers1.size();
+          Integer numRunningContainers2 = runningContainers2.size();
+
+          // If two NMs are running equal number of AMs, sort them based on total num of running containers
+          if (numRunningAMs1.compareTo(numRunningAMs2) == 0) {
+            return numRunningContainers1.compareTo(numRunningContainers2);
+          }
+          return numRunningAMs1.compareTo(numRunningAMs2);
+        }
+      });
+    }
+  }
+
+  @Override
+  public void afterSchedulerEventHandled(SchedulerEvent event) {
+
+    try {
+      switch (event.getType()) {
+        case NODE_UPDATE:
+          onNodeUpdated((NodeUpdateSchedulerEvent) event);
+          break;
+
+        case NODE_REMOVED:
+          onNodeRemoved((NodeRemovedSchedulerEvent) event);
+          break;
+
+        default:
+          break;
+      }
+    } catch (ClassCastException e) {
+      LOGGER.error("incorrect event object", e);
+    }
+  }
+
+  /**
+   * Called whenever a NM HBs to RM. The NM's updates will already be recorded in the
+   * SchedulerNode before this method is called.
+   *
+   * @param event
+   */
+  private void onNodeUpdated(NodeUpdateSchedulerEvent event) {
+    NodeId nodeID = event.getRMNode().getNodeID();
+    SchedulerNode schedulerNode = yarnScheduler.getSchedulerNode(nodeID);
+    schedulerNodes.put(nodeID.getHost(), schedulerNode); // keep track of only one node per host
+  }
+
+  private void onNodeRemoved(NodeRemovedSchedulerEvent event) {
+    SchedulerNode schedulerNode = schedulerNodes.get(event.getRemovedRMNode().getNodeID().getHost());
+    if (schedulerNode != null && schedulerNode.getNodeID().equals(event.getRemovedRMNode().getNodeID())) {
+      schedulerNodes.remove(schedulerNode.getNodeID().getHost());
+    }
+  }
+
+  private Integer getNumAMContainers(List<RMContainer> containers) {
+    int count = 0;
+    for (RMContainer container : containers) {
+      if (container.isAMContainer()) {
+        count++;
+      }
+    }
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java b/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java
new file mode 100644
index 0000000..3d70294
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.policy;
+
+import org.apache.mesos.Protos;
+
+import java.util.List;
+
+/**
+ * Policy for scaling down the node managers.
+ */
+public interface NodeScaleDownPolicy {
+
+  /**
+   * Apply a scale down policy to the given list of taskIDs.
+   *
+   * @param taskIDs
+   */
+  public void apply(List<Protos.TaskID> taskIDs);
+
+}