You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@myriad.apache.org by sm...@apache.org on 2015/10/28 17:07:46 UTC
[14/20] incubator-myriad git commit: com.ebay => org.apache
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java
new file mode 100644
index 0000000..2131178
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos.Credential;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.FrameworkInfo.Builder;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.mesos.state.State;
+import org.apache.mesos.state.ZooKeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.scheduler.MyriadDriver;
+import org.apache.myriad.scheduler.MyriadScheduler;
+import org.apache.myriad.state.SchedulerState;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.protobuf.ByteString;
+
+/**
+ * Guice Module for Mesos objects.
+ */
+public class MesosModule extends AbstractModule {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MesosModule.class);
+
+ public MesosModule() {
+ }
+
+ @Override
+ protected void configure() {
+ bind(MyriadDriver.class).in(Scopes.SINGLETON);
+ }
+
+ @Provides
+ @Singleton
+ SchedulerDriver providesSchedulerDriver(MyriadScheduler scheduler, MyriadConfiguration cfg, SchedulerState schedulerState) {
+
+ Builder frameworkInfoBuilder = FrameworkInfo.newBuilder().setUser("").setName(cfg.getFrameworkName()).setCheckpoint(cfg.isCheckpoint()).setFailoverTimeout(cfg.getFrameworkFailoverTimeout());
+
+ if (StringUtils.isNotEmpty(cfg.getFrameworkRole())) {
+ frameworkInfoBuilder.setRole(cfg.getFrameworkRole());
+ }
+
+ FrameworkID frameworkId = schedulerState.getFrameworkID();
+ if (frameworkId != null) {
+ LOGGER.info("Attempting to re-register with frameworkId: {}", frameworkId.getValue());
+ frameworkInfoBuilder.setId(frameworkId);
+ }
+
+ String mesosAuthenticationPrincipal = cfg.getMesosAuthenticationPrincipal();
+ String mesosAuthenticationSecretFilename = cfg.getMesosAuthenticationSecretFilename();
+ if (StringUtils.isNotEmpty(mesosAuthenticationPrincipal)) {
+ frameworkInfoBuilder.setPrincipal(mesosAuthenticationPrincipal);
+
+ Credential.Builder credentialBuilder = Credential.newBuilder();
+ credentialBuilder.setPrincipal(mesosAuthenticationPrincipal);
+ if (StringUtils.isNotEmpty(mesosAuthenticationSecretFilename)) {
+ try {
+ credentialBuilder.setSecret(ByteString.readFrom(new FileInputStream(mesosAuthenticationSecretFilename)));
+ } catch (FileNotFoundException ex) {
+ LOGGER.error("Mesos authentication secret file was not found", ex);
+ throw new RuntimeException(ex);
+ } catch (IOException ex) {
+ LOGGER.error("Error reading Mesos authentication secret file", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+ return new MesosSchedulerDriver(scheduler, frameworkInfoBuilder.build(), cfg.getMesosMaster(), credentialBuilder.build());
+ } else {
+ return new MesosSchedulerDriver(scheduler, frameworkInfoBuilder.build(), cfg.getMesosMaster());
+ }
+ }
+
+ @Provides
+ @Singleton
+ State providesStateStore(MyriadConfiguration cfg) {
+ return new ZooKeeperState(cfg.getZkServers(), cfg.getZkTimeout(), TimeUnit.MILLISECONDS, "/myriad/" + cfg.getFrameworkName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
new file mode 100644
index 0000000..659160e
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad;
+
+import org.apache.myriad.configuration.ServiceConfiguration;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.MyriadExecutorConfiguration;
+import org.apache.myriad.configuration.NodeManagerConfiguration;
+import org.apache.myriad.policy.LeastAMNodesFirstPolicy;
+import org.apache.myriad.policy.NodeScaleDownPolicy;
+import org.apache.myriad.scheduler.MyriadDriverManager;
+import org.apache.myriad.scheduler.fgs.NMHeartBeatHandler;
+import org.apache.myriad.scheduler.fgs.NodeStore;
+import org.apache.myriad.scheduler.fgs.OfferLifecycleManager;
+import org.apache.myriad.scheduler.DownloadNMExecutorCLGenImpl;
+import org.apache.myriad.scheduler.ExecutorCommandLineGenerator;
+import org.apache.myriad.scheduler.NMExecutorCLGenImpl;
+import org.apache.myriad.scheduler.NMTaskFactoryAnnotation;
+import org.apache.myriad.scheduler.ReconcileService;
+import org.apache.myriad.scheduler.ServiceProfileManager;
+import org.apache.myriad.scheduler.ServiceTaskFactoryImpl;
+import org.apache.myriad.scheduler.TaskConstraintsManager;
+import org.apache.myriad.scheduler.TaskFactory;
+import org.apache.myriad.scheduler.TaskFactory.NMTaskFactoryImpl;
+import org.apache.myriad.scheduler.fgs.YarnNodeCapacityManager;
+import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
+import org.apache.myriad.state.MyriadStateStore;
+import org.apache.myriad.state.SchedulerState;
+import org.apache.myriad.webapp.HttpConnectorProvider;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.MapBinder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Guice Module for Myriad
+ */
+public class MyriadModule extends AbstractModule {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MyriadModule.class);
+
+ private MyriadConfiguration cfg;
+ private Configuration hadoopConf;
+ private AbstractYarnScheduler yarnScheduler;
+ private final RMContext rmContext;
+ private InterceptorRegistry interceptorRegistry;
+
+ public MyriadModule(MyriadConfiguration cfg, Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, InterceptorRegistry interceptorRegistry) {
+ this.cfg = cfg;
+ this.hadoopConf = hadoopConf;
+ this.yarnScheduler = yarnScheduler;
+ this.rmContext = rmContext;
+ this.interceptorRegistry = interceptorRegistry;
+ }
+
+ @Override
+ protected void configure() {
+ LOGGER.debug("Configuring guice");
+ bind(MyriadConfiguration.class).toInstance(cfg);
+ bind(Configuration.class).toInstance(hadoopConf);
+ bind(RMContext.class).toInstance(rmContext);
+ bind(AbstractYarnScheduler.class).toInstance(yarnScheduler);
+ bind(InterceptorRegistry.class).toInstance(interceptorRegistry);
+ bind(MyriadDriverManager.class).in(Scopes.SINGLETON);
+ bind(org.apache.myriad.scheduler.MyriadScheduler.class).in(Scopes.SINGLETON);
+ bind(ServiceProfileManager.class).in(Scopes.SINGLETON);
+ bind(DisruptorManager.class).in(Scopes.SINGLETON);
+ bind(ReconcileService.class).in(Scopes.SINGLETON);
+ bind(HttpConnectorProvider.class).in(Scopes.SINGLETON);
+ bind(TaskConstraintsManager.class).in(Scopes.SINGLETON);
+ // add special binding between TaskFactory and NMTaskFactoryImpl to ease up
+ // usage of TaskFactory
+ bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactoryImpl.class);
+ bind(YarnNodeCapacityManager.class).in(Scopes.SINGLETON);
+ bind(NodeStore.class).in(Scopes.SINGLETON);
+ bind(OfferLifecycleManager.class).in(Scopes.SINGLETON);
+ bind(NMHeartBeatHandler.class).asEagerSingleton();
+
+ MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
+ mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON);
+ Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
+ if (auxServicesConfigs != null) {
+ for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
+ String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull();
+ if (taskFactoryClass != null) {
+ try {
+ Class<? extends TaskFactory> implClass = (Class<? extends TaskFactory>) Class.forName(taskFactoryClass);
+ mapBinder.addBinding(entry.getKey()).to(implClass).in(Scopes.SINGLETON);
+ } catch (ClassNotFoundException e) {
+ LOGGER.error("ClassNotFoundException", e);
+ }
+ } else {
+ mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON);
+ }
+ }
+ }
+ //TODO(Santosh): Should be configurable as well
+ bind(NodeScaleDownPolicy.class).to(LeastAMNodesFirstPolicy.class).in(Scopes.SINGLETON);
+ }
+
+ @Provides
+ @Singleton
+ SchedulerState providesSchedulerState(MyriadConfiguration cfg) {
+ LOGGER.debug("Configuring SchedulerState provider");
+ MyriadStateStore myriadStateStore = null;
+ if (cfg.isHAEnabled()) {
+ myriadStateStore = providesMyriadStateStore();
+ if (myriadStateStore == null) {
+ throw new RuntimeException("Could not find a state store" +
+ " implementation for Myriad. The 'yarn.resourcemanager.store.class'" +
+ " property should be set to a class implementing the" +
+ " MyriadStateStore interface. For e.g." +
+ " org.apache.hadoop.yarn.server.resourcemanager.recovery.MyriadFileSystemRMStateStore");
+ }
+ }
+ return new SchedulerState(myriadStateStore);
+ }
+
+ private MyriadStateStore providesMyriadStateStore() {
+ // TODO (sdaingade) Read the implementation class from yml
+ // once multiple implementations are available.
+ if (rmContext.getStateStore() instanceof MyriadStateStore) {
+ return (MyriadStateStore) rmContext.getStateStore();
+ }
+ return null;
+ }
+
+ @Provides
+ @Singleton
+ ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) {
+ ExecutorCommandLineGenerator cliGenerator = null;
+ MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
+ if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+ cliGenerator = new DownloadNMExecutorCLGenImpl(cfg, myriadExecutorConfiguration.getNodeManagerUri().get());
+ } else {
+ cliGenerator = new NMExecutorCLGenImpl(cfg);
+ }
+ return cliGenerator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java
new file mode 100644
index 0000000..e1bee58
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api;
+
+import com.codahale.metrics.annotation.Timed;
+import org.apache.myriad.api.model.FlexDownClusterRequest;
+import org.apache.myriad.api.model.FlexDownServiceRequest;
+import org.apache.myriad.api.model.FlexUpClusterRequest;
+import org.apache.myriad.scheduler.ServiceResourceProfile;
+import org.apache.myriad.scheduler.constraints.ConstraintFactory;
+import org.apache.myriad.state.SchedulerState;
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.myriad.api.model.FlexUpServiceRequest;
+import org.apache.myriad.configuration.MyriadBadConfigurationException;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.scheduler.ServiceProfileManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.Response.Status;
+
+/**
+ * RESTful API to resource manager
+ */
+@Path("/cluster")
+public class ClustersResource {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClustersResource.class);
+ private static final String LIKE_CONSTRAINT_FORMAT = "'<mesos_slave_attribute|hostname> LIKE <value_regex>'";
+
+ private MyriadConfiguration cfg;
+ private SchedulerState schedulerState;
+ private ServiceProfileManager profileManager;
+ private org.apache.myriad.scheduler.MyriadOperations myriadOperations;
+
+ @Inject
+ public ClustersResource(MyriadConfiguration cfg, SchedulerState state, ServiceProfileManager profileManager, org.apache.myriad.scheduler.MyriadOperations myriadOperations) {
+ this.cfg = cfg;
+ this.schedulerState = state;
+ this.profileManager = profileManager;
+ this.myriadOperations = myriadOperations;
+ }
+
+ @Timed
+ @PUT
+ @Path("/flexup")
+ @Produces(MediaType.TEXT_PLAIN)
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response flexUp(FlexUpClusterRequest request) {
+ Preconditions.checkNotNull(request, "request object cannot be null or empty");
+
+ Integer instances = request.getInstances();
+ String profile = request.getProfile();
+ List<String> constraints = request.getConstraints();
+ LOGGER.info("Received flexup request. Profile: {}, Instances: {}, Constraints: {}", profile, instances, constraints);
+
+ Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED);
+ boolean isValidRequest = validateProfile(profile, response);
+ isValidRequest = isValidRequest && validateInstances(instances, response);
+ isValidRequest = isValidRequest && validateConstraints(constraints, response);
+
+ Response returnResponse = response.build();
+ if (returnResponse.getStatus() == Response.Status.ACCEPTED.getStatusCode()) {
+ String constraint = constraints != null && !constraints.isEmpty() ? constraints.get(0) : null;
+ this.myriadOperations.flexUpCluster(this.profileManager.get(profile), instances, ConstraintFactory.createConstraint(constraint));
+ }
+
+ return returnResponse;
+ }
+
+ @Timed
+ @PUT
+ @Path("/flexupservice")
+ @Produces(MediaType.TEXT_PLAIN)
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response flexUpservice(FlexUpServiceRequest request) {
+ Preconditions.checkNotNull(request, "request object cannot be null or empty");
+
+ LOGGER.info("Received Flexup a Service Request");
+
+ Integer instances = request.getInstances();
+ String serviceName = request.getServiceName();
+
+ LOGGER.info("Instances: {}", instances);
+ LOGGER.info("Service: {}", serviceName);
+
+ // Validate profile request
+ Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED);
+
+ if (cfg.getServiceConfiguration(serviceName) == null) {
+ response.status(Response.Status.BAD_REQUEST).entity("Service does not exist: " + serviceName);
+ LOGGER.error("Provided service does not exist " + serviceName);
+ return response.build();
+ }
+
+ if (!validateInstances(instances, response)) {
+ return response.build();
+ }
+
+ try {
+ this.myriadOperations.flexUpAService(instances, serviceName);
+ } catch (MyriadBadConfigurationException e) {
+ return response.status(Response.Status.BAD_REQUEST).entity(e.getMessage()).build();
+ }
+
+ return response.build();
+ }
+
+ @Timed
+ @PUT
+ @Path("/flexdown")
+ @Produces(MediaType.TEXT_PLAIN)
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response flexDown(FlexDownClusterRequest request) {
+ Preconditions.checkNotNull(request, "request object cannot be null or empty");
+
+ Integer instances = request.getInstances();
+ String profile = request.getProfile();
+ List<String> constraints = request.getConstraints();
+ LOGGER.info("Received flex down request. Profile: {}, Instances: {}, Constraints: {}", profile, instances, constraints);
+
+ Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED);
+ boolean isValidRequest = validateProfile(profile, response);
+ isValidRequest = isValidRequest && validateInstances(instances, response);
+ isValidRequest = isValidRequest && validateConstraints(constraints, response);
+
+ if (isValidRequest) {
+ Integer numFlexedUp = this.getNumFlexedupNMs(profile);
+ if (numFlexedUp < instances) {
+ String message = String.format("Number of requested instances for flexdown is greater than the number of " +
+ "Node Managers previously flexed up for profile '%s'. Requested: %d, Previously flexed Up: %d. " +
+ "Only %d Node Managers will be flexed down.", profile, instances, numFlexedUp, numFlexedUp);
+ response.entity(message);
+ LOGGER.warn(message);
+ }
+ }
+
+ Response returnResponse = response.build();
+ if (returnResponse.getStatus() == Response.Status.ACCEPTED.getStatusCode()) {
+ String constraint = constraints != null && !constraints.isEmpty() ? constraints.get(0) : null;
+ this.myriadOperations.flexDownCluster(profileManager.get(profile), ConstraintFactory.createConstraint(constraint), instances);
+ }
+ return returnResponse;
+ }
+
+ private boolean validateProfile(String profile, ResponseBuilder response) {
+ if (profile == null || profile.isEmpty()) {
+ response.status(Response.Status.BAD_REQUEST).entity("'profile' is null or empty");
+ LOGGER.error("'profile' is null or empty");
+ return false;
+ }
+ if (!this.profileManager.exists(profile)) {
+ response.status(Response.Status.BAD_REQUEST).entity("Profile does not exist: '" + profile + "'");
+ LOGGER.error("Provided profile does not exist: '" + profile + "'");
+ return false;
+ }
+ return true;
+ }
+
+ private boolean validateInstances(Integer instances, ResponseBuilder response) {
+ if (instances == null) {
+ response.status(Response.Status.BAD_REQUEST).entity("'instances' is null");
+ LOGGER.error("'instances' is null");
+ return false;
+ } else if (!(instances > 0)) {
+ response.status(Response.Status.BAD_REQUEST).entity("Invalid instance size: " + instances);
+ LOGGER.error("Invalid instance size request " + instances);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean validateConstraints(List<String> constraints, ResponseBuilder response) {
+ if (constraints != null && !constraints.isEmpty()) {
+ boolean valid = validateConstraintsSize(constraints, response);
+ valid = valid && validateLIKEConstraint(constraints.get(0), response);
+ return valid;
+ }
+ return true;
+ }
+
+ private boolean validateConstraintsSize(List<String> constraints, ResponseBuilder response) {
+ if (constraints.size() > 1) {
+ String message = String.format("Only 1 constraint is currently supported. Received: %s", constraints.toString());
+ response.status(Status.BAD_REQUEST).entity(message);
+ LOGGER.error(message);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean validateLIKEConstraint(String constraint, ResponseBuilder response) {
+ if (constraint.isEmpty()) {
+ String message = String.format("The value provided for 'constraints' is empty. Format: %s", LIKE_CONSTRAINT_FORMAT);
+ response.status(Status.BAD_REQUEST).entity(message);
+ LOGGER.error(message);
+ return false;
+ }
+
+ String[] splits = constraint.split(" LIKE "); // "<key> LIKE <val_regex>"
+ if (splits.length != 2) {
+ String message = String.format("Invalid format for LIKE operator in constraint: %s. Format: %s", constraint, LIKE_CONSTRAINT_FORMAT);
+ response.status(Status.BAD_REQUEST).entity(message);
+ LOGGER.error(message);
+ return false;
+ }
+ try {
+ Pattern.compile(splits[1]);
+ } catch (PatternSyntaxException e) {
+ String message = String.format("Invalid regex for LIKE operator in constraint: %s", constraint);
+ response.status(Status.BAD_REQUEST).entity(message);
+ LOGGER.error(message, e);
+ return false;
+ }
+ return true;
+ }
+
+
+ private Integer getNumFlexedupNMs(String profile) {
+ ServiceResourceProfile serviceProfile = profileManager.get(profile);
+ return this.schedulerState.getActiveTaskIDsForProfile(serviceProfile).size() + this.schedulerState.getStagingTaskIDsForProfile(serviceProfile).size() + this.schedulerState.getPendingTaskIDsForProfile(serviceProfile).size();
+ }
+
+ @Timed
+ @PUT
+ @Path("/flexdownservice")
+ @Produces(MediaType.TEXT_PLAIN)
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response flexDownservice(FlexDownServiceRequest request) {
+ Preconditions.checkNotNull(request, "request object cannot be null or empty");
+
+ Integer instances = request.getInstances();
+ String serviceName = request.getServiceName();
+
+ LOGGER.info("Received flexdown request for service {}", serviceName);
+ LOGGER.info("Instances: " + instances);
+
+ Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED);
+
+ if (!validateInstances(instances, response)) {
+ return response.build();
+ }
+
+ // warn that number of requested instances isn't available
+ // but instances will still be flexed down
+ Integer flexibleInstances = this.myriadOperations.getFlexibleInstances(serviceName);
+ if (flexibleInstances < instances) {
+ response.entity("Number of requested instances is greater than available.");
+ // just doing a simple check here. pass the requested number of instances
+ // to myriadOperations and let it sort out how many actually get flexxed down.
+ LOGGER.warn("Requested number of instances greater than available: {} < {}", flexibleInstances, instances);
+ }
+
+ this.myriadOperations.flexDownAService(instances, serviceName);
+ return response.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java
new file mode 100644
index 0000000..336b22f
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api;
+
+import com.codahale.metrics.annotation.Timed;
+import org.apache.myriad.configuration.MyriadConfiguration;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+/**
+ * Defines the REST API to the Myriad configuration.
+ */
+@Path("/config")
+@Produces(MediaType.APPLICATION_JSON)
+public class ConfigurationResource {
+ private MyriadConfiguration cfg;
+
+ @Inject
+ public ConfigurationResource(MyriadConfiguration cfg) {
+ this.cfg = cfg;
+ }
+
+ @Timed
+ @GET
+ public MyriadConfiguration getConfig() throws InterruptedException {
+ return this.cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
new file mode 100644
index 0000000..a3bee85
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api;
+
+import com.codahale.metrics.annotation.Timed;
+import org.apache.myriad.api.model.GetSchedulerStateResponse;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.mesos.Protos;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Defines the REST API for the current state of Myriad
+ */
+@Path("/state")
+@Produces(MediaType.APPLICATION_JSON)
+public class SchedulerStateResource {
+ private MyriadConfiguration cfg;
+ private org.apache.myriad.state.SchedulerState state;
+
+ @Inject
+ public SchedulerStateResource(final MyriadConfiguration cfg, final org.apache.myriad.state.SchedulerState state) {
+ this.cfg = cfg;
+ this.state = state;
+ }
+
+ @Timed
+ @GET
+ public GetSchedulerStateResponse getState() {
+ return new GetSchedulerStateResponse(toStringCollection(state.getPendingTaskIds()), toStringCollection(state.getStagingTaskIds()), toStringCollection(state.getActiveTaskIds()), toStringCollection(state.getKillableTasks()));
+ }
+
+ private Collection<String> toStringCollection(Collection<Protos.TaskID> collection) {
+ if (CollectionUtils.isEmpty(collection)) {
+ return Collections.emptyList();
+ }
+ Collection<String> returnCollection = new ArrayList<>();
+ for (Protos.TaskID task : collection) {
+ returnCollection.add(task.getValue());
+ }
+
+ return returnCollection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownClusterRequest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownClusterRequest.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownClusterRequest.java
new file mode 100644
index 0000000..4dd84a3
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownClusterRequest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api.model;
+
+import com.google.gson.Gson;
+import java.util.List;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Flex down request parameters
+ */
+public class FlexDownClusterRequest {
+
+ @NotEmpty
+ public String profile;
+
+ @NotEmpty
+ public Integer instances;
+
+ public List<String> constraints;
+
+ public FlexDownClusterRequest() {
+ }
+
+ public FlexDownClusterRequest(String profile, Integer instances, List<String> constraints) {
+ this.instances = instances;
+ this.profile = profile;
+ this.constraints = constraints;
+ }
+
+ public Integer getInstances() {
+ return instances;
+ }
+
+ public void setInstances(Integer instances) {
+ this.instances = instances;
+ }
+
+ public String getProfile() {
+ return profile;
+ }
+
+ public void setProfile(String profile) {
+ this.profile = profile;
+ }
+
+ public List<String> getConstraints() {
+ return constraints;
+ }
+
+ public void setConstraints(List<String> constraints) {
+ this.constraints = constraints;
+ }
+
+ public String toString() {
+ Gson gson = new Gson();
+ return gson.toJson(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownServiceRequest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownServiceRequest.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownServiceRequest.java
new file mode 100644
index 0000000..bbfcb58
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexDownServiceRequest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.myriad.api.model;
+
+import com.google.gson.Gson;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Flex down an auxtask/service
+ */
+public class FlexDownServiceRequest {
+
+ @NotEmpty
+ public Integer instances;
+
+ @NotEmpty
+ public String serviceName;
+
+ public FlexDownServiceRequest() {
+ }
+
+ public FlexDownServiceRequest(Integer instances, String serviceName) {
+ this.instances = instances;
+ this.serviceName = serviceName;
+ }
+
+ public Integer getInstances() {
+ return instances;
+ }
+
+ public void setInstances(Integer instances) {
+ this.instances = instances;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public String toString() {
+ Gson gson = new Gson();
+ return gson.toJson(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpClusterRequest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpClusterRequest.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpClusterRequest.java
new file mode 100644
index 0000000..0ab9402
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpClusterRequest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api.model;
+
+import com.google.gson.Gson;
+import java.util.List;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Flex up an auxtask/service
+ */
+public class FlexUpClusterRequest {
+ @NotEmpty
+ public Integer instances;
+
+ @NotEmpty
+ public String profile;
+
+ public List<String> constraints;
+
+ public FlexUpClusterRequest() {
+ }
+
+ public FlexUpClusterRequest(Integer instances, String profile, List<String> constraints) {
+ this.instances = instances;
+ this.profile = profile;
+ this.constraints = constraints;
+ }
+
+ public Integer getInstances() {
+ return instances;
+ }
+
+ public void setInstances(Integer instances) {
+ this.instances = instances;
+ }
+
+ public String getProfile() {
+ return profile;
+ }
+
+ public void setProfile(String profile) {
+ this.profile = profile;
+ }
+
+ public List<String> getConstraints() {
+ return constraints;
+ }
+
+ public void setConstraints(List<String> constraints) {
+ this.constraints = constraints;
+ }
+
+ public String toString() {
+ Gson gson = new Gson();
+ return gson.toJson(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpServiceRequest.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpServiceRequest.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpServiceRequest.java
new file mode 100644
index 0000000..b047894
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/FlexUpServiceRequest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.myriad.api.model;
+
+import com.google.gson.Gson;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Flex up request parameters
+ */
+public class FlexUpServiceRequest {
+ @NotEmpty
+ public Integer instances;
+
+ @NotEmpty
+ public String serviceName;
+
+ public FlexUpServiceRequest() {
+ }
+
+ public FlexUpServiceRequest(Integer instances, String profile) {
+ this.instances = instances;
+ this.serviceName = profile;
+ }
+
+ public Integer getInstances() {
+ return instances;
+ }
+
+ public void setInstances(Integer instances) {
+ this.instances = instances;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String profile) {
+ this.serviceName = profile;
+ }
+
+ public String toString() {
+ Gson gson = new Gson();
+ return gson.toJson(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java
new file mode 100644
index 0000000..9ae5cc7
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.api.model;
+
+import java.util.Collection;
+
+/**
+ * Response for the current state of Myriad
+ */
+public class GetSchedulerStateResponse {
+ private Collection<String> pendingTasks;
+ private Collection<String> stagingTasks;
+ private Collection<String> activeTasks;
+ private Collection<String> killableTasks;
+
+ public GetSchedulerStateResponse() {
+
+ }
+
+ public GetSchedulerStateResponse(Collection<String> pendingTasks, Collection<String> stagingTasks, Collection<String> activeTasks, Collection<String> killableTasks) {
+ this.pendingTasks = pendingTasks;
+ this.stagingTasks = stagingTasks;
+ this.activeTasks = activeTasks;
+ this.killableTasks = killableTasks;
+ }
+
+ public Collection<String> getPendingTasks() {
+ return pendingTasks;
+ }
+
+ public Collection<String> getStagingTasks() {
+ return stagingTasks;
+ }
+
+ public Collection<String> getActiveTasks() {
+ return activeTasks;
+ }
+
+ public Collection<String> getKillableTasks() {
+ return killableTasks;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadBadConfigurationException.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadBadConfigurationException.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadBadConfigurationException.java
new file mode 100644
index 0000000..6a23323
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadBadConfigurationException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.myriad.configuration;
+
+/**
+ * Myriad specific exception
+ */
+@SuppressWarnings("serial")
+public class MyriadBadConfigurationException extends Exception {
+
+
+ public MyriadBadConfigurationException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
new file mode 100644
index 0000000..efb185f
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.configuration;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+
+import org.hibernate.validator.constraints.NotEmpty;
+
+import java.util.Map;
+
+/**
+ * Myriad Configuration commonly defined in the YML file
+ * mesosMaster: 10.0.2.15:5050
+ * checkpoint: false
+ * frameworkFailoverTimeout: 43200000
+ * frameworkName: MyriadAlpha
+ * nativeLibrary: /usr/local/lib/libmesos.so
+ * zkServers: localhost:2181
+ * zkTimeout: 20000
+ * profiles:
+ * small:
+ * cpu: 1
+ * mem: 1100
+ * medium:
+ * cpu: 2
+ * mem: 2048
+ * large:
+ * cpu: 4
+ * mem: 4096
+ * rebalancer: false
+ * nodemanager:
+ * jvmMaxMemoryMB: 1024
+ * user: hduser
+ * cpus: 0.2
+ * cgroups: false
+ * executor:
+ * jvmMaxMemoryMB: 256
+ * path: file://localhost/usr/local/libexec/mesos/myriad-executor-runnable-0.0.1.jar
+ * yarnEnvironment:
+ * YARN_HOME: /usr/local/hadoop
+ */
+public class MyriadConfiguration {
+ /**
+ * By default framework checkpointing is turned off.
+ */
+ public static final Boolean DEFAULT_CHECKPOINT = false;
+
+ /**
+ * By default rebalancer is turned off.
+ */
+ public static final Boolean DEFAULT_REBALANCER = true;
+
+ /**
+ * By default ha is turned off.
+ */
+ public static final Boolean DEFAULT_HA_ENABLED = false;
+
+ /**
+ * By default framework failover timeout is 1 day.
+ */
+ public static final Double DEFAULT_FRAMEWORK_FAILOVER_TIMEOUT_MS = 86400000.0;
+
+ public static final String DEFAULT_FRAMEWORK_NAME = "myriad-scheduler";
+
+ public static final String DEFAULT_NATIVE_LIBRARY = "/usr/local/lib/libmesos.so";
+
+ public static final Integer DEFAULT_ZK_TIMEOUT = 20000;
+
+ public static final Integer DEFAULT_REST_API_PORT = 8192;
+
+ @JsonProperty
+ @NotEmpty
+ private String mesosMaster;
+
+ @JsonProperty
+ private Boolean checkpoint;
+
+ @JsonProperty
+ private Double frameworkFailoverTimeout;
+
+ @JsonProperty
+ private String frameworkName;
+
+ @JsonProperty
+ private String frameworkRole;
+
+ @JsonProperty
+ @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class)
+ private String frameworkUser;
+
+ @JsonProperty
+ @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class)
+ private String frameworkSuperUser;
+
+ @JsonProperty
+ @NotEmpty
+ private Map<String, Map<String, String>> profiles;
+
+ @JsonProperty
+ @NotEmpty
+ private Map<String, Integer> nmInstances;
+
+ @JsonProperty
+ private Boolean rebalancer;
+
+ @JsonProperty
+ private Boolean haEnabled;
+
+ @JsonProperty
+ private NodeManagerConfiguration nodemanager;
+
+ @JsonProperty
+ private Map<String, ServiceConfiguration> services;
+
+ @JsonProperty
+ @NotEmpty
+ private MyriadExecutorConfiguration executor;
+
+ @JsonProperty
+ private String nativeLibrary;
+
+ @JsonProperty
+ @NotEmpty
+ private String zkServers;
+
+ @JsonProperty
+ private Integer zkTimeout;
+
+ @JsonProperty
+ private Integer restApiPort;
+
+ @JsonProperty
+ @NotEmpty
+ private Map<String, String> yarnEnvironment;
+
+ @JsonProperty
+ private String mesosAuthenticationPrincipal;
+
+ @JsonProperty
+ private String mesosAuthenticationSecretFilename;
+
+
+ public MyriadConfiguration() {
+ }
+
+
+ public String getMesosMaster() {
+ return mesosMaster;
+ }
+
+ public Boolean isCheckpoint() {
+ return this.checkpoint != null ? checkpoint : DEFAULT_CHECKPOINT;
+ }
+
+ public Double getFrameworkFailoverTimeout() {
+ return this.frameworkFailoverTimeout != null ? this.frameworkFailoverTimeout : DEFAULT_FRAMEWORK_FAILOVER_TIMEOUT_MS;
+ }
+
+ public String getFrameworkName() {
+ return Strings.isNullOrEmpty(this.frameworkName) ? DEFAULT_FRAMEWORK_NAME : this.frameworkName;
+ }
+
+ public String getFrameworkRole() {
+ return frameworkRole;
+ }
+
+ public Optional<String> getFrameworkUser() {
+ return Optional.fromNullable(frameworkUser);
+ }
+
+ public Optional<String> getFrameworkSuperUser() {
+ return Optional.fromNullable(frameworkSuperUser);
+ }
+
+ public Map<String, Map<String, String>> getProfiles() {
+ return profiles;
+ }
+
+ public Map<String, Integer> getNmInstances() {
+ return nmInstances;
+ }
+
+ public Boolean isRebalancer() {
+ return rebalancer != null ? rebalancer : DEFAULT_REBALANCER;
+ }
+
+ public Boolean isHAEnabled() {
+ return haEnabled != null ? haEnabled : DEFAULT_HA_ENABLED;
+ }
+
+ public NodeManagerConfiguration getNodeManagerConfiguration() {
+ return this.nodemanager;
+ }
+
+ public Map<String, ServiceConfiguration> getServiceConfigurations() {
+ return this.services;
+ }
+
+ public ServiceConfiguration getServiceConfiguration(String taskName) {
+ if (services == null) {
+ return null;
+ }
+ return this.services.get(taskName);
+ }
+
+ public MyriadExecutorConfiguration getMyriadExecutorConfiguration() {
+ return this.executor;
+ }
+
+ public String getNativeLibrary() {
+ return Strings.isNullOrEmpty(this.nativeLibrary) ? DEFAULT_NATIVE_LIBRARY : this.nativeLibrary;
+ }
+
+ public String getZkServers() {
+ return this.zkServers;
+ }
+
+ public Integer getZkTimeout() {
+ return this.zkTimeout != null ? this.zkTimeout : DEFAULT_ZK_TIMEOUT;
+ }
+
+ public Integer getRestApiPort() {
+ return this.restApiPort != null ? this.restApiPort : DEFAULT_REST_API_PORT;
+ }
+
+ public Map<String, String> getYarnEnvironment() {
+ return yarnEnvironment;
+ }
+
+ public String getMesosAuthenticationSecretFilename() {
+ return mesosAuthenticationSecretFilename;
+ }
+
+ public String getMesosAuthenticationPrincipal() {
+ return mesosAuthenticationPrincipal;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java
new file mode 100644
index 0000000..d68b6fb
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.configuration;
+
+import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble;
+import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.hibernate.validator.constraints.NotEmpty;
+
+/**
+ * Configuration for the Executor
+ */
+public class MyriadExecutorConfiguration {
+ /**
+ * Translates to -Xmx for the NodeManager JVM.
+ */
+ @JsonProperty
+ @JsonSerialize(using = OptionalSerializerDouble.class)
+ private Double jvmMaxMemoryMB;
+
+ @JsonProperty
+ @NotEmpty
+ private String path;
+
+ @JsonProperty
+ @JsonSerialize(using = OptionalSerializerString.class)
+ private String nodeManagerUri;
+
+ public Optional<Double> getJvmMaxMemoryMB() {
+ return Optional.fromNullable(jvmMaxMemoryMB);
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public Optional<String> getNodeManagerUri() {
+ return Optional.fromNullable(nodeManagerUri);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
new file mode 100644
index 0000000..1c6c096
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.configuration;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerBoolean;
+import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble;
+import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+
+/**
+ * Node Manager Configuration
+ */
+public class NodeManagerConfiguration {
+ /**
+ * Allot 10% more memory to account for JVM overhead.
+ */
+ public static final double JVM_OVERHEAD = 0.1;
+
+ /**
+ * Default -Xmx for NodeManager JVM.
+ */
+ public static final double DEFAULT_JVM_MAX_MEMORY_MB = 2048;
+
+ /**
+ * Default cpu for NodeManager JVM.
+ */
+ public static final double DEFAULT_NM_CPUS = 1;
+
+ public static final String NM_TASK_PREFIX = "nm";
+
+ /**
+ * Translates to -Xmx for the NodeManager JVM.
+ */
+ @JsonProperty
+ @JsonSerialize(using = OptionalSerializerDouble.class)
+ private Double jvmMaxMemoryMB;
+
+ /**
+ * Amount of CPU share given to NodeManger JVM. This is critical specially
+ * for NodeManager auxiliary services.
+ */
+ @JsonProperty
+ @JsonSerialize(using = OptionalSerializerDouble.class)
+ private Double cpus;
+
+ /**
+ * Translates to JAVA_OPTS for the NodeManager JVM.
+ */
+ @JsonProperty
+ @JsonSerialize(using = OptionalSerializerString.class)
+ private String jvmOpts;
+
+ /**
+ * Determines if cgroups are enabled for NM or not.
+ */
+ @JsonProperty
+ @JsonSerialize(using = OptionalSerializerBoolean.class)
+ private Boolean cgroups;
+
+ public Optional<Double> getJvmMaxMemoryMB() {
+ return Optional.fromNullable(jvmMaxMemoryMB);
+ }
+
+ public Optional<String> getJvmOpts() {
+ return Optional.fromNullable(jvmOpts);
+ }
+
+ public Optional<Double> getCpus() {
+ return Optional.fromNullable(cpus);
+ }
+
+ public Optional<Boolean> getCgroups() {
+ return Optional.fromNullable(cgroups);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java
new file mode 100644
index 0000000..e0cba43
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.configuration;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializerProvider;
+
+import com.google.common.base.Optional;
+
+/**
+ * Custom Serializer that allows to serialize Optional
+ * today Optional does not serialize value, but just state: "present: true/false"
+ * This class will serialize <T> value instead of state
+ * This is needed for REST APIs and Myriad UI
+ *
+ * @param <T>
+ */
+public class OptionalSerializer<T> extends JsonSerializer<Optional<T>> {
+
+ private static final JsonFactory jsonFactory = new ObjectMapper().getJsonFactory();
+
+ protected ObjectMapper objMapper;
+
+ public OptionalSerializer() {
+ objMapper = new ObjectMapper(jsonFactory);
+ }
+
+ @Override
+ public void serialize(Optional<T> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+ if (value.isPresent()) {
+ objMapper.writeValue(jgen, value.get());
+ } else {
+ objMapper.writeValue(jgen, "value is absent");
+ }
+ }
+
+ /**
+ * Custom String serializer
+ */
+ public static class OptionalSerializerString extends OptionalSerializer<String> {
+ @Override
+ public void serialize(Optional<String> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+ super.serialize(value, jgen, provider);
+ }
+ }
+
+ /**
+ * Custom Double serializer
+ */
+ public static class OptionalSerializerDouble extends OptionalSerializer<Double> {
+ @Override
+ public void serialize(Optional<Double> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+ super.serialize(value, jgen, provider);
+ }
+ }
+
+ /**
+ * Custom Integer serializer
+ */
+ public static class OptionalSerializerInt extends OptionalSerializer<Integer> {
+ @Override
+ public void serialize(Optional<Integer> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+ super.serialize(value, jgen, provider);
+ }
+ }
+
+ /**
+ * Custom Boolean serializer
+ */
+ public static class OptionalSerializerBoolean extends OptionalSerializer<Boolean> {
+ @Override
+ public void serialize(Optional<Boolean> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+ super.serialize(value, jgen, provider);
+ }
+ }
+
+ /**
+ * Custom Map serializer
+ */
+ public static class OptionalSerializerMap extends OptionalSerializer<Map<?, ?>> {
+ @Override
+ public void serialize(Optional<Map<?, ?>> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException {
+ super.serialize(value, jgen, provider);
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
new file mode 100644
index 0000000..f0b913c
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.myriad.configuration;
+
+import java.util.Map;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.hibernate.validator.constraints.NotEmpty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+
+/**
+ * Configuration for any service/task to be started from Myriad Scheduler
+ */
+public class ServiceConfiguration {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConfiguration.class);
+
+ public static final Double DEFAULT_CPU = 0.1;
+
+ public static final Double DEFAULT_MEMORY = 256.0;
+
+ /**
+ * Translates to -Xmx for the JVM.
+ */
+ @JsonProperty
+ @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble.class)
+ protected Double jvmMaxMemoryMB;
+
+ /**
+ * Amount of CPU share given to JVM.
+ */
+ @JsonProperty
+ @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble.class)
+ protected Double cpus;
+
+ /**
+ * Translates to jvm opts for the JVM.
+ */
+ @JsonProperty
+ @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class)
+ protected String jvmOpts;
+
+ @JsonProperty
+ @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerMap.class)
+ protected Map<String, Long> ports;
+
+ /**
+ * If we will have some services
+ * that are not easy to express just by properties
+ * we can use this one to have a specific implementation
+ */
+ @JsonProperty
+ @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class)
+ protected String taskFactoryImplName;
+
+ @JsonProperty
+ protected String envSettings;
+
+ @JsonProperty
+ @NotEmpty
+ protected String taskName;
+
+ @JsonProperty
+ @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerInt.class)
+ protected Integer maxInstances;
+
+ @JsonProperty
+ @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class)
+ protected String command;
+
+ @JsonProperty
+ protected String serviceOptsName;
+
+
+ public Optional<Double> getJvmMaxMemoryMB() {
+ return Optional.fromNullable(jvmMaxMemoryMB);
+ }
+
+ public Optional<String> getJvmOpts() {
+ return Optional.fromNullable(jvmOpts);
+ }
+
+ public Optional<Double> getCpus() {
+ return Optional.fromNullable(cpus);
+ }
+
+ public String getTaskName() {
+ return taskName;
+ }
+
+ public void setTaskName(String taskName) {
+ this.taskName = taskName;
+ }
+
+ public Optional<String> getTaskFactoryImplName() {
+ return Optional.fromNullable(taskFactoryImplName);
+ }
+
+ public String getEnvSettings() {
+ return envSettings;
+ }
+
+ public Optional<Map<String, Long>> getPorts() {
+ return Optional.fromNullable(ports);
+ }
+
+ public Optional<Integer> getMaxInstances() {
+ return Optional.fromNullable(maxInstances);
+ }
+
+ public Optional<String> getCommand() {
+ return Optional.fromNullable(command);
+ }
+
+ public String getServiceOpts() {
+ return serviceOptsName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java
new file mode 100644
index 0000000..e1b6a17
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.health;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.Socket;
+
+/**
+ * Health Check Utilities
+ */
+public class HealthCheckUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HealthCheckUtils.class);
+
+ public static boolean checkHostPort(String connectionString) {
+ String[] split = connectionString.split(":");
+ String serverAddress = split[0];
+ Integer serverPort = Integer.valueOf(split[1]);
+ try (Socket s = new Socket(serverAddress, serverPort)) {
+ return true;
+ } catch (IOException ex) {
+ LOGGER.error("parsing host port", ex);
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java
new file mode 100644
index 0000000..96dcfe2
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.health;
+
+import com.codahale.metrics.health.HealthCheck;
+import org.apache.mesos.Protos.Status;
+
+import javax.inject.Inject;
+
+/**
+ * Health Check that Mesos Master is running
+ */
+public class MesosDriverHealthCheck extends HealthCheck {
+
+ public static final String NAME = "mesos-driver";
+ private org.apache.myriad.scheduler.MyriadDriverManager driverManager;
+
+ @Inject
+ public MesosDriverHealthCheck(org.apache.myriad.scheduler.MyriadDriverManager driverManager) {
+ this.driverManager = driverManager;
+ }
+
+ @Override
+ protected Result check() throws Exception {
+ Status driverStatus = driverManager.getDriverStatus();
+ if (Status.DRIVER_RUNNING == driverStatus) {
+ return Result.healthy();
+ } else {
+ return Result.unhealthy("Driver status: " + driverStatus);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java
new file mode 100644
index 0000000..a394c30
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.health;
+
+import com.codahale.metrics.health.HealthCheck;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Health check for Mesos master
+ */
+public class MesosMasterHealthCheck extends HealthCheck {
+ public static final String NAME = "mesos-master";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MesosMasterHealthCheck.class);
+
+ private MyriadConfiguration cfg;
+
+ @Inject
+ public MesosMasterHealthCheck(MyriadConfiguration cfg) {
+ this.cfg = cfg;
+ }
+
+ @Override
+ protected Result check() throws Exception {
+ String mesosMaster = cfg.getMesosMaster();
+ int zkIndex = mesosMaster.indexOf("zk://", 0);
+ Result result = Result.unhealthy("Unable to connect to: " + mesosMaster);
+ if (zkIndex >= 0) {
+ final int fromIndex = 5;
+ String zkHostPorts = mesosMaster.substring(fromIndex, mesosMaster.indexOf("/", fromIndex));
+
+ String[] hostPorts = zkHostPorts.split(",");
+
+ for (String hostPort : hostPorts) {
+ final int maxRetries = 3;
+ final int baseSleepTimeMs = 1000;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(hostPort, new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries));
+ client.start();
+ final int blockTime = 5;
+ client.blockUntilConnected(blockTime, TimeUnit.SECONDS);
+
+ switch (client.getState()) {
+ case STARTED:
+ result = Result.healthy();
+ break;
+ case STOPPED:
+ LOGGER.info("Unable to reach: ", hostPort);
+ break;
+ case LATENT:
+ LOGGER.info("Unable to reach: ", hostPort);
+ break;
+ default:
+ LOGGER.info("Unable to reach: ", hostPort);
+ }
+ }
+ } else {
+ if (HealthCheckUtils.checkHostPort(mesosMaster)) {
+ result = Result.healthy();
+ }
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java
new file mode 100644
index 0000000..46b86a1
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.health;
+
+import com.codahale.metrics.health.HealthCheck;
+import org.apache.myriad.configuration.MyriadConfiguration;
+
+import javax.inject.Inject;
+
+/**
+ * Health Check on ZK
+ */
+public class ZookeeperHealthCheck extends HealthCheck {
+ public static final String NAME = "zookeeper";
+ private MyriadConfiguration cfg;
+
+ @Inject
+ public ZookeeperHealthCheck(MyriadConfiguration cfg) {
+ this.cfg = cfg;
+ }
+
+ @Override
+ protected Result check() throws Exception {
+ // todo: (kensipe) this needs to be implemented
+ return Result.healthy();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java b/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java
new file mode 100644
index 0000000..916bdf0
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.policy;
+
+import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A scale down policy that maintains returns a list of nodes running least number of AMs.
+ */
+public class LeastAMNodesFirstPolicy extends BaseInterceptor implements NodeScaleDownPolicy {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LeastAMNodesFirstPolicy.class);
+
+ private final AbstractYarnScheduler yarnScheduler;
+ private final org.apache.myriad.state.SchedulerState schedulerState;
+
+ //TODO(Santosh): Should figure out the right values for the hashmap properties.
+ // currently it's tuned for 200 nodes and 50 RM RPC threads (Yarn's default).
+ private static final int INITIAL_NODE_SIZE = 200;
+ private static final int EXPECTED_CONCURRENT_ACCCESS_COUNT = 50;
+ private static final float LOAD_FACTOR_DEFAULT = 0.75f;
+
+ private Map<String, SchedulerNode> schedulerNodes = new ConcurrentHashMap<>(INITIAL_NODE_SIZE, LOAD_FACTOR_DEFAULT, EXPECTED_CONCURRENT_ACCCESS_COUNT);
+
+ @Inject
+ public LeastAMNodesFirstPolicy(org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, org.apache.myriad.state.SchedulerState schedulerState) {
+ registry.register(this);
+ this.yarnScheduler = yarnScheduler;
+ this.schedulerState = schedulerState;
+ }
+
+ /**
+ * Sort the given list of tasks by the number of App Master containers running on the corresponding NM node.
+ *
+ * @param taskIDs
+ */
+ @Override
+ public void apply(List<Protos.TaskID> taskIDs) {
+ if (LOGGER.isDebugEnabled()) {
+ for (SchedulerNode node : schedulerNodes.values()) {
+ LOGGER.debug("Host {} is running {} containers including {} App Masters", node.getNodeID().getHost(), node.getRunningContainers().size(), getNumAMContainers(node.getRunningContainers()));
+ }
+ }
+ // We need to lock the YARN scheduler here. If we don't do that, then the YARN scheduler can
+ // process HBs from NodeManagers and the state of SchedulerNode objects might change while we
+ // are in the middle of sorting them based on the least number of AM containers.
+ synchronized (yarnScheduler) {
+ Collections.sort(taskIDs, new Comparator<Protos.TaskID>() {
+ @Override
+ public int compare(Protos.TaskID t1, Protos.TaskID t2) {
+ SchedulerNode o1 = schedulerNodes.get(schedulerState.getTask(t1).getHostname());
+ SchedulerNode o2 = schedulerNodes.get(schedulerState.getTask(t2).getHostname());
+
+ if (o1 == null) { // a NM was launched by Myriad, but it hasn't yet registered with RM
+ if (o2 == null) {
+ return 0;
+ } else {
+ return -1;
+ }
+ } else if (o2 == null) {
+ return 1;
+ } // else, both the NMs have registered with RM
+
+ List<RMContainer> runningContainers1 = o1.getRunningContainers();
+ List<RMContainer> runningContainers2 = o2.getRunningContainers();
+
+ Integer numRunningAMs1 = getNumAMContainers(runningContainers1);
+ Integer numRunningAMs2 = getNumAMContainers(runningContainers2);
+
+ Integer numRunningContainers1 = runningContainers1.size();
+ Integer numRunningContainers2 = runningContainers2.size();
+
+ // If two NMs are running equal number of AMs, sort them based on total num of running containers
+ if (numRunningAMs1.compareTo(numRunningAMs2) == 0) {
+ return numRunningContainers1.compareTo(numRunningContainers2);
+ }
+ return numRunningAMs1.compareTo(numRunningAMs2);
+ }
+ });
+ }
+ }
+
+ @Override
+ public void afterSchedulerEventHandled(SchedulerEvent event) {
+
+ try {
+ switch (event.getType()) {
+ case NODE_UPDATE:
+ onNodeUpdated((NodeUpdateSchedulerEvent) event);
+ break;
+
+ case NODE_REMOVED:
+ onNodeRemoved((NodeRemovedSchedulerEvent) event);
+ break;
+
+ default:
+ break;
+ }
+ } catch (ClassCastException e) {
+ LOGGER.error("incorrect event object", e);
+ }
+ }
+
+ /**
+ * Called whenever a NM HBs to RM. The NM's updates will already be recorded in the
+ * SchedulerNode before this method is called.
+ *
+ * @param event
+ */
+ private void onNodeUpdated(NodeUpdateSchedulerEvent event) {
+ NodeId nodeID = event.getRMNode().getNodeID();
+ SchedulerNode schedulerNode = yarnScheduler.getSchedulerNode(nodeID);
+ schedulerNodes.put(nodeID.getHost(), schedulerNode); // keep track of only one node per host
+ }
+
+ private void onNodeRemoved(NodeRemovedSchedulerEvent event) {
+ SchedulerNode schedulerNode = schedulerNodes.get(event.getRemovedRMNode().getNodeID().getHost());
+ if (schedulerNode != null && schedulerNode.getNodeID().equals(event.getRemovedRMNode().getNodeID())) {
+ schedulerNodes.remove(schedulerNode.getNodeID().getHost());
+ }
+ }
+
+ private Integer getNumAMContainers(List<RMContainer> containers) {
+ int count = 0;
+ for (RMContainer container : containers) {
+ if (container.isAMContainer()) {
+ count++;
+ }
+ }
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java b/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java
new file mode 100644
index 0000000..3d70294
--- /dev/null
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.policy;
+
+import org.apache.mesos.Protos;
+
+import java.util.List;
+
+/**
+ * Policy for scaling down the node managers.
+ */
+public interface NodeScaleDownPolicy {
+
+ /**
+ * Apply a scale down policy to the given list of taskIDs.
+ *
+ * @param taskIDs
+ */
+ public void apply(List<Protos.TaskID> taskIDs);
+
+}