You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@myriad.apache.org by sm...@apache.org on 2015/10/28 17:07:39 UTC
[07/20] incubator-myriad git commit: spacing changes
spacing changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/67ecf063
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/67ecf063
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/67ecf063
Branch: refs/heads/master
Commit: 67ecf0639382d7fad386a392bd0f83bc82ae7167
Parents: e0ca4da
Author: klucar <kl...@gmail.com>
Authored: Wed Oct 28 09:07:44 2015 -0400
Committer: Santosh Marella <ma...@gmail.com>
Committed: Wed Oct 28 08:55:17 2015 -0700
----------------------------------------------------------------------
config/checkstyle/checkstyle.xml | 6 +
myriad-commons/config/checkstyle/checkstyle.xml | 6 +
.../executor/ContainerTaskStatusRequest.java | 36 +-
.../myriad/executor/MyriadExecutorDefaults.java | 70 +-
.../com/ebay/myriad/executor/NMTaskConfig.java | 188 ++---
.../config/checkstyle/checkstyle.xml | 6 +
.../ebay/myriad/executor/MyriadExecutor.java | 38 +-
.../executor/MyriadExecutorAuxService.java | 25 +-
.../config/checkstyle/checkstyle.xml | 6 +
.../java/com/ebay/myriad/DisruptorManager.java | 234 +++---
.../src/main/java/com/ebay/myriad/Main.java | 432 ++++++-----
.../main/java/com/ebay/myriad/MesosModule.java | 36 +-
.../main/java/com/ebay/myriad/MyriadModule.java | 189 +++--
.../com/ebay/myriad/api/ClustersResource.java | 427 ++++++-----
.../ebay/myriad/api/ConfigurationResource.java | 26 +-
.../ebay/myriad/api/SchedulerStateResource.java | 53 +-
.../api/model/FlexDownClusterRequest.java | 74 +-
.../api/model/FlexDownServiceRequest.java | 86 +--
.../myriad/api/model/FlexUpClusterRequest.java | 76 +-
.../myriad/api/model/FlexUpServiceRequest.java | 100 +--
.../api/model/GetSchedulerStateResponse.java | 70 +-
.../MyriadBadConfigurationException.java | 35 +-
.../configuration/MyriadConfiguration.java | 18 +-
.../MyriadExecutorConfiguration.java | 58 +-
.../configuration/NodeManagerConfiguration.java | 108 +--
.../configuration/OptionalSerializer.java | 48 +-
.../configuration/ServiceConfiguration.java | 67 +-
.../ebay/myriad/health/HealthCheckUtils.java | 30 +-
.../myriad/health/MesosDriverHealthCheck.java | 34 +-
.../myriad/health/MesosMasterHealthCheck.java | 94 ++-
.../myriad/health/ZookeeperHealthCheck.java | 28 +-
.../myriad/policy/LeastAMNodesFirstPolicy.java | 228 +++---
.../ebay/myriad/policy/NodeScaleDownPolicy.java | 17 +-
.../scheduler/DownloadNMExecutorCLGenImpl.java | 30 +-
.../scheduler/ExecutorCommandLineGenerator.java | 14 +-
.../scheduler/ExtendedResourceProfile.java | 19 +-
.../com/ebay/myriad/scheduler/MyriadDriver.java | 60 +-
.../myriad/scheduler/MyriadDriverManager.java | 123 ++--
.../ebay/myriad/scheduler/MyriadOperations.java | 309 ++++----
.../ebay/myriad/scheduler/MyriadScheduler.java | 255 +++----
.../myriad/scheduler/NMExecutorCLGenImpl.java | 135 ++--
.../java/com/ebay/myriad/scheduler/NMPorts.java | 85 +--
.../com/ebay/myriad/scheduler/NMProfile.java | 76 +-
.../ebay/myriad/scheduler/NMProfileManager.java | 39 +-
.../scheduler/NMTaskFactoryAnnotation.java | 40 +-
.../java/com/ebay/myriad/scheduler/Ports.java | 7 +-
.../com/ebay/myriad/scheduler/Rebalancer.java | 136 ++--
.../ebay/myriad/scheduler/ReconcileService.java | 80 +-
.../ebay/myriad/scheduler/SchedulerUtils.java | 63 +-
.../scheduler/ServiceCommandLineGenerator.java | 38 +-
.../myriad/scheduler/ServiceProfileManager.java | 20 +-
.../scheduler/ServiceResourceProfile.java | 40 +-
.../scheduler/ServiceTaskConstraints.java | 11 +-
.../scheduler/ServiceTaskFactoryImpl.java | 125 ++--
.../ebay/myriad/scheduler/TaskConstraints.java | 35 +-
.../scheduler/TaskConstraintsManager.java | 13 +-
.../com/ebay/myriad/scheduler/TaskFactory.java | 113 +--
.../ebay/myriad/scheduler/TaskTerminator.java | 85 +--
.../com/ebay/myriad/scheduler/TaskUtils.java | 266 ++++---
.../scheduler/constraints/Constraint.java | 6 +-
.../constraints/ConstraintFactory.java | 6 +-
.../scheduler/constraints/LikeConstraint.java | 9 +-
.../scheduler/event/DisconnectedEvent.java | 20 +-
.../event/DisconnectedEventFactory.java | 14 +-
.../ebay/myriad/scheduler/event/ErrorEvent.java | 34 +-
.../scheduler/event/ErrorEventFactory.java | 14 +-
.../scheduler/event/ExecutorLostEvent.java | 62 +-
.../event/ExecutorLostEventFactory.java | 17 +-
.../scheduler/event/FrameworkMessageEvent.java | 62 +-
.../event/FrameworkMessageEventFactory.java | 17 +-
.../scheduler/event/OfferRescindedEvent.java | 36 +-
.../event/OfferRescindedEventFactory.java | 17 +-
.../scheduler/event/ReRegisteredEvent.java | 34 +-
.../event/ReRegisteredEventFactory.java | 17 +-
.../myriad/scheduler/event/RegisteredEvent.java | 48 +-
.../scheduler/event/RegisteredEventFactory.java | 14 +-
.../scheduler/event/ResourceOffersEvent.java | 34 +-
.../event/ResourceOffersEventFactory.java | 17 +-
.../myriad/scheduler/event/SlaveLostEvent.java | 34 +-
.../scheduler/event/SlaveLostEventFactory.java | 14 +-
.../scheduler/event/StatusUpdateEvent.java | 34 +-
.../event/StatusUpdateEventFactory.java | 17 +-
.../handlers/DisconnectedEventHandler.java | 16 +-
.../event/handlers/ErrorEventHandler.java | 18 +-
.../handlers/ExecutorLostEventHandler.java | 23 +-
.../handlers/FrameworkMessageEventHandler.java | 21 +-
.../handlers/OfferRescindedEventHandler.java | 16 +-
.../handlers/ReRegisteredEventHandler.java | 26 +-
.../event/handlers/RegisteredEventHandler.java | 28 +-
.../handlers/ResourceOffersEventHandler.java | 82 +--
.../event/handlers/SlaveLostEventHandler.java | 18 +-
.../handlers/StatusUpdateEventHandler.java | 112 ++-
.../myriad/scheduler/fgs/ConsumedOffer.java | 6 +-
.../scheduler/fgs/NMHeartBeatHandler.java | 39 +-
.../com/ebay/myriad/scheduler/fgs/Node.java | 6 +-
.../ebay/myriad/scheduler/fgs/NodeStore.java | 6 +-
.../ebay/myriad/scheduler/fgs/OfferFeed.java | 8 +-
.../scheduler/fgs/OfferLifecycleManager.java | 15 +-
.../ebay/myriad/scheduler/fgs/OfferUtils.java | 6 +-
.../scheduler/fgs/YarnNodeCapacityManager.java | 266 +++----
.../scheduler/yarn/MyriadCapacityScheduler.java | 6 +-
.../scheduler/yarn/MyriadFairScheduler.java | 90 +--
.../scheduler/yarn/MyriadFifoScheduler.java | 6 +-
.../scheduler/yarn/RMNodeEventHandler.java | 26 +-
.../yarn/interceptor/BaseInterceptor.java | 36 +-
.../yarn/interceptor/CompositeInterceptor.java | 159 ++--
.../yarn/interceptor/InterceptorRegistry.java | 8 +-
.../MyriadInitializationInterceptor.java | 44 +-
.../interceptor/YarnSchedulerInterceptor.java | 106 +--
.../java/com/ebay/myriad/state/Cluster.java | 142 ++--
.../java/com/ebay/myriad/state/MyriadState.java | 40 +-
.../com/ebay/myriad/state/MyriadStateStore.java | 8 +-
.../java/com/ebay/myriad/state/NodeTask.java | 178 ++---
.../com/ebay/myriad/state/SchedulerState.java | 725 +++++++++----------
.../myriad/state/utils/ByteBufferSupport.java | 54 +-
.../ebay/myriad/state/utils/StoreContext.java | 42 +-
.../myriad/webapp/HttpConnectorProvider.java | 32 +-
.../ebay/myriad/webapp/MyriadServletModule.java | 24 +-
.../com/ebay/myriad/webapp/MyriadWebServer.java | 63 +-
.../ebay/myriad/webapp/WebAppGuiceModule.java | 16 +-
.../recovery/MyriadFileSystemRMStateStore.java | 28 +-
.../test/java/com/ebay/myriad/MesosModule.java | 17 +-
.../java/com/ebay/myriad/MultiBindingsTest.java | 50 +-
.../com/ebay/myriad/MultiBindingsUsage.java | 33 +-
.../java/com/ebay/myriad/MyriadTestModule.java | 68 +-
.../MyriadBadConfigurationExceptionTest.java | 35 +-
.../configuration/MyriadConfigurationTest.java | 45 +-
.../myriad/scheduler/SchedulerUtilsSpec.groovy | 16 +-
.../myriad/scheduler/TMSTaskFactoryImpl.java | 43 +-
.../myriad/scheduler/TestMyriadScheduler.java | 12 +-
.../scheduler/TestServiceCommandLine.java | 31 +-
.../ebay/myriad/scheduler/TestTaskUtils.java | 61 +-
.../constraints/LikeConstraintSpec.groovy | 96 +--
.../myriad/scheduler/fgs/FGSTestBaseSpec.groovy | 216 +++---
.../scheduler/fgs/NMHeartBeatHandlerSpec.groovy | 148 ++--
.../fgs/YarnNodeCapacityManagerSpec.groovy | 190 ++---
136 files changed, 4450 insertions(+), 4899 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/config/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml
index 0ba6803..6eb5178 100644
--- a/config/checkstyle/checkstyle.xml
+++ b/config/checkstyle/checkstyle.xml
@@ -315,5 +315,11 @@ page at http://checkstyle.sourceforge.net/config.html -->
<property name="severity" value="warning"/>
</module>
+ <module name="Indentation">
+ <property name="basicOffset" value="2"/>
+ <property name="braceAdjustment" value="0"/>
+ <property name="caseIndent" value="2"/>
+ <property name="throwsIndent" value="2"/>
+ </module>
</module>
</module>
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-commons/config/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/myriad-commons/config/checkstyle/checkstyle.xml b/myriad-commons/config/checkstyle/checkstyle.xml
index 0ba6803..6eb5178 100644
--- a/myriad-commons/config/checkstyle/checkstyle.xml
+++ b/myriad-commons/config/checkstyle/checkstyle.xml
@@ -315,5 +315,11 @@ page at http://checkstyle.sourceforge.net/config.html -->
<property name="severity" value="warning"/>
</module>
+ <module name="Indentation">
+ <property name="basicOffset" value="2"/>
+ <property name="braceAdjustment" value="0"/>
+ <property name="caseIndent" value="2"/>
+ <property name="throwsIndent" value="2"/>
+ </module>
</module>
</module>
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-commons/src/main/java/com/ebay/myriad/executor/ContainerTaskStatusRequest.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/com/ebay/myriad/executor/ContainerTaskStatusRequest.java b/myriad-commons/src/main/java/com/ebay/myriad/executor/ContainerTaskStatusRequest.java
index c37deeb..a93e3a8 100644
--- a/myriad-commons/src/main/java/com/ebay/myriad/executor/ContainerTaskStatusRequest.java
+++ b/myriad-commons/src/main/java/com/ebay/myriad/executor/ContainerTaskStatusRequest.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,23 +23,23 @@ package com.ebay.myriad.executor;
* the mesos task id (placeholder task).
*/
public class ContainerTaskStatusRequest {
- public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_";
- private String mesosTaskId; // YARN_CONTAINER_TASK_ID_PREFIX + <container_id>
- private String state; // Protos.TaskState.name()
+ public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_";
+ private String mesosTaskId; // YARN_CONTAINER_TASK_ID_PREFIX + <container_id>
+ private String state; // Protos.TaskState.name()
- public String getMesosTaskId() {
- return mesosTaskId;
- }
+ public String getMesosTaskId() {
+ return mesosTaskId;
+ }
- public void setMesosTaskId(String mesosTaskId) {
- this.mesosTaskId = mesosTaskId;
- }
+ public void setMesosTaskId(String mesosTaskId) {
+ this.mesosTaskId = mesosTaskId;
+ }
- public String getState() {
- return state;
- }
+ public String getState() {
+ return state;
+ }
- public void setState(String state) {
- this.state = state;
- }
+ public void setState(String state) {
+ this.state = state;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-commons/src/main/java/com/ebay/myriad/executor/MyriadExecutorDefaults.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/com/ebay/myriad/executor/MyriadExecutorDefaults.java b/myriad-commons/src/main/java/com/ebay/myriad/executor/MyriadExecutorDefaults.java
index 5479f80..32925e5 100644
--- a/myriad-commons/src/main/java/com/ebay/myriad/executor/MyriadExecutorDefaults.java
+++ b/myriad-commons/src/main/java/com/ebay/myriad/executor/MyriadExecutorDefaults.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,54 +22,54 @@ package com.ebay.myriad.executor;
* Myriad's Executor Defaults
*/
public class MyriadExecutorDefaults {
- public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS";
+ public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS";
- /**
- * YARN container executor class.
- */
- public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = "yarn.nodemanager.container-executor.class";
+ /**
+ * YARN container executor class.
+ */
+ public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = "yarn.nodemanager.container-executor.class";
- public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor";
+ public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor";
- public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor";
+ public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor";
- /**
- * YARN class to help handle LCE resources
- */
- public static final String KEY_YARN_NM_LCE_RH_CLASS = "yarn.nodemanager.linux-container-executor.resources-handler.class";
+ /**
+ * YARN class to help handle LCE resources
+ */
+ public static final String KEY_YARN_NM_LCE_RH_CLASS = "yarn.nodemanager.linux-container-executor.resources-handler.class";
- public static final String VAL_YARN_NM_LCE_RH_CLASS = "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler";
+ public static final String VAL_YARN_NM_LCE_RH_CLASS = "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler";
- public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
+ public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
- public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = "yarn.nodemanager.linux-container-executor.cgroups.mount";
+ public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = "yarn.nodemanager.linux-container-executor.cgroups.mount";
- public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "yarn.nodemanager.linux-container-executor.cgroups.mount-path";
+ public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "yarn.nodemanager.linux-container-executor.cgroups.mount-path";
- public static final String KEY_YARN_NM_LCE_GROUP = "yarn.nodemanager.linux-container-executor.group";
+ public static final String KEY_YARN_NM_LCE_GROUP = "yarn.nodemanager.linux-container-executor.group";
- public static final String KEY_YARN_NM_LCE_PATH = "yarn.nodemanager.linux-container-executor.path";
+ public static final String KEY_YARN_NM_LCE_PATH = "yarn.nodemanager.linux-container-executor.path";
- public static final String KEY_YARN_HOME = "yarn.home";
+ public static final String KEY_YARN_HOME = "yarn.home";
- public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores";
+ public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores";
- public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb";
+ public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb";
- /**
- * Allot 10% more memory to account for JVM overhead.
- */
- public static final double JVM_OVERHEAD = 0.1;
+ /**
+ * Allot 10% more memory to account for JVM overhead.
+ */
+ public static final double JVM_OVERHEAD = 0.1;
- /**
- * Default -Xmx for executor JVM.
- */
+ /**
+ * Default -Xmx for executor JVM.
+ */
- public static final double DEFAULT_JVM_MAX_MEMORY_MB = 256;
- /**
- * Default cpus for executor JVM.
- */
- public static final double DEFAULT_CPUS = 0.2;
+ public static final double DEFAULT_JVM_MAX_MEMORY_MB = 256;
+ /**
+ * Default cpus for executor JVM.
+ */
+ public static final double DEFAULT_CPUS = 0.2;
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-commons/src/main/java/com/ebay/myriad/executor/NMTaskConfig.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/com/ebay/myriad/executor/NMTaskConfig.java b/myriad-commons/src/main/java/com/ebay/myriad/executor/NMTaskConfig.java
index b5d1b23..92626b3 100644
--- a/myriad-commons/src/main/java/com/ebay/myriad/executor/NMTaskConfig.java
+++ b/myriad-commons/src/main/java/com/ebay/myriad/executor/NMTaskConfig.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,96 +24,96 @@ import java.util.Map;
* Node Manger Task Configuraiton
*/
public class NMTaskConfig {
- private String yarnHome;
- private Long advertisableCpus;
- private Long advertisableMem;
- private String jvmOpts;
- private Boolean cgroups;
- private Long rpcPort;
- private Long localizerPort;
- private Long webAppHttpPort;
- private Long shufflePort;
-
- private Map<String, String> yarnEnvironment;
-
- public String getYarnHome() {
- return yarnHome;
- }
-
- public void setYarnHome(String yarnHome) {
- this.yarnHome = yarnHome;
- }
-
- public Long getAdvertisableCpus() {
- return advertisableCpus;
- }
-
- public void setAdvertisableCpus(Long advertisableCpus) {
- this.advertisableCpus = advertisableCpus;
- }
-
- public Long getAdvertisableMem() {
- return advertisableMem;
- }
-
- public void setAdvertisableMem(Long advertisableMem) {
- this.advertisableMem = advertisableMem;
- }
-
- public String getJvmOpts() {
- return jvmOpts;
- }
-
- public void setJvmOpts(String jvmOpts) {
- this.jvmOpts = jvmOpts;
- }
-
- public Boolean getCgroups() {
- return cgroups;
- }
-
- public void setCgroups(Boolean cgroups) {
- this.cgroups = cgroups;
- }
-
- public Map<String, String> getYarnEnvironment() {
- return yarnEnvironment;
- }
-
- public void setYarnEnvironment(Map<String, String> yarnEnvironment) {
- this.yarnEnvironment = yarnEnvironment;
- }
-
- public Long getRpcPort() {
- return rpcPort;
- }
-
- public void setRpcPort(long port) {
- rpcPort = port;
- }
-
- public Long gettWebAppHttpPort() {
- return webAppHttpPort;
- }
-
- public void setWebAppHttpPort(Long port) {
- webAppHttpPort = port;
- }
-
- public Long getLocalizerPort() {
- return localizerPort;
- }
-
- public void setLocalizerPort(Long localizerPort) {
- this.localizerPort = localizerPort;
- }
-
- public Long getShufflePort() {
- return shufflePort;
- }
-
- public void setShufflePort(Long shufflePort) {
- this.shufflePort = shufflePort;
- }
+ private String yarnHome;
+ private Long advertisableCpus;
+ private Long advertisableMem;
+ private String jvmOpts;
+ private Boolean cgroups;
+ private Long rpcPort;
+ private Long localizerPort;
+ private Long webAppHttpPort;
+ private Long shufflePort;
+
+ private Map<String, String> yarnEnvironment;
+
+ public String getYarnHome() {
+ return yarnHome;
+ }
+
+ public void setYarnHome(String yarnHome) {
+ this.yarnHome = yarnHome;
+ }
+
+ public Long getAdvertisableCpus() {
+ return advertisableCpus;
+ }
+
+ public void setAdvertisableCpus(Long advertisableCpus) {
+ this.advertisableCpus = advertisableCpus;
+ }
+
+ public Long getAdvertisableMem() {
+ return advertisableMem;
+ }
+
+ public void setAdvertisableMem(Long advertisableMem) {
+ this.advertisableMem = advertisableMem;
+ }
+
+ public String getJvmOpts() {
+ return jvmOpts;
+ }
+
+ public void setJvmOpts(String jvmOpts) {
+ this.jvmOpts = jvmOpts;
+ }
+
+ public Boolean getCgroups() {
+ return cgroups;
+ }
+
+ public void setCgroups(Boolean cgroups) {
+ this.cgroups = cgroups;
+ }
+
+ public Map<String, String> getYarnEnvironment() {
+ return yarnEnvironment;
+ }
+
+ public void setYarnEnvironment(Map<String, String> yarnEnvironment) {
+ this.yarnEnvironment = yarnEnvironment;
+ }
+
+ public Long getRpcPort() {
+ return rpcPort;
+ }
+
+ public void setRpcPort(long port) {
+ rpcPort = port;
+ }
+
+ public Long gettWebAppHttpPort() {
+ return webAppHttpPort;
+ }
+
+ public void setWebAppHttpPort(Long port) {
+ webAppHttpPort = port;
+ }
+
+ public Long getLocalizerPort() {
+ return localizerPort;
+ }
+
+ public void setLocalizerPort(Long localizerPort) {
+ this.localizerPort = localizerPort;
+ }
+
+ public Long getShufflePort() {
+ return shufflePort;
+ }
+
+ public void setShufflePort(Long shufflePort) {
+ this.shufflePort = shufflePort;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-executor/config/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/myriad-executor/config/checkstyle/checkstyle.xml b/myriad-executor/config/checkstyle/checkstyle.xml
index 0ba6803..6eb5178 100644
--- a/myriad-executor/config/checkstyle/checkstyle.xml
+++ b/myriad-executor/config/checkstyle/checkstyle.xml
@@ -315,5 +315,11 @@ page at http://checkstyle.sourceforge.net/config.html -->
<property name="severity" value="warning"/>
</module>
+ <module name="Indentation">
+ <property name="basicOffset" value="2"/>
+ <property name="braceAdjustment" value="0"/>
+ <property name="caseIndent" value="2"/>
+ <property name="throwsIndent" value="2"/>
+ </module>
</module>
</module>
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
----------------------------------------------------------------------
diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
index 47463a4..37926d4 100644
--- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
+++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,9 +49,8 @@ public class MyriadExecutor implements Executor {
}
@Override
- public void registered(ExecutorDriver driver, ExecutorInfo executorInfo,
- FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
- LOGGER.debug("Registered ", executorInfo, " for framework ", frameworkInfo, " on mesos slave ", slaveInfo);
+ public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
+ LOGGER.debug("Registered ", executorInfo, " for framework ", frameworkInfo, " on mesos slave ", slaveInfo);
}
@Override
@@ -67,11 +66,8 @@ public class MyriadExecutor implements Executor {
@Override
public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
LOGGER.debug("launchTask received for taskId: " + task.getTaskId());
- TaskStatus status = TaskStatus.newBuilder()
- .setTaskId(task.getTaskId())
- .setState(TaskState.TASK_RUNNING)
- .build();
- driver.sendStatusUpdate(status);
+ TaskStatus status = TaskStatus.newBuilder().setTaskId(task.getTaskId()).setState(TaskState.TASK_RUNNING).build();
+ driver.sendStatusUpdate(status);
}
@Override
@@ -79,30 +75,22 @@ public class MyriadExecutor implements Executor {
LOGGER.debug("killTask received for taskId: " + taskId.getValue());
TaskStatus status;
- if (!taskId.toString().contains(
- MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX)) {
+ if (!taskId.toString().contains(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX)) {
// Inform mesos of killing all tasks corresponding to yarn containers that are
// currently running
synchronized (containerIds) {
for (String containerId : containerIds) {
- Protos.TaskID containerTaskId = Protos.TaskID.newBuilder()
- .setValue(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX
- + containerId).build();
- status = TaskStatus.newBuilder().setTaskId(containerTaskId)
- .setState(TaskState.TASK_KILLED)
- .build();
- driver.sendStatusUpdate(status);
+ Protos.TaskID containerTaskId = Protos.TaskID.newBuilder().setValue(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX + containerId).build();
+ status = TaskStatus.newBuilder().setTaskId(containerTaskId).setState(TaskState.TASK_KILLED).build();
+ driver.sendStatusUpdate(status);
}
}
// Now kill the node manager task
- status = TaskStatus.newBuilder()
- .setTaskId(taskId)
- .setState(TaskState.TASK_KILLED)
- .build();
+ status = TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_KILLED).build();
driver.sendStatusUpdate(status);
LOGGER.info("NodeManager shutdown after receiving" +
- " KillTask for taskId " + taskId.getValue());
+ " KillTask for taskId " + taskId.getValue());
Runtime.getRuntime().exit(0);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
----------------------------------------------------------------------
diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
index 361fc05..86ea60e 100644
--- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
+++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
/**
* Auxillary service wrapper for MyriadExecutor
*/
-public class MyriadExecutorAuxService extends AuxiliaryService {
+public class MyriadExecutorAuxService extends AuxiliaryService {
private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class);
private static final String SERVICE_NAME = "myriad_service";
@@ -65,16 +65,14 @@ public class MyriadExecutorAuxService extends AuxiliaryService {
myriadExecutorThread = new Thread(new Runnable() {
public void run() {
driver = new MesosExecutorDriver(new MyriadExecutor(containerIds));
- LOGGER.error("MyriadExecutor exit with status " +
- Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1));
+ LOGGER.error("MyriadExecutor exit with status " + Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1));
}
});
myriadExecutorThread.start();
}
@Override
- public void initializeApplication(
- ApplicationInitializationContext initAppContext) {
+ public void initializeApplication(ApplicationInitializationContext initAppContext) {
LOGGER.debug("initializeApplication");
}
@@ -108,14 +106,9 @@ public class MyriadExecutorAuxService extends AuxiliaryService {
}
private void sendStatus(ContainerId containerId, TaskState taskState) {
- Protos.TaskID taskId = Protos.TaskID.newBuilder()
- .setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString())
- .build();
-
- TaskStatus status = TaskStatus.newBuilder()
- .setTaskId(taskId)
- .setState(taskState)
- .build();
+ Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString()).build();
+
+ TaskStatus status = TaskStatus.newBuilder().setTaskId(taskId).setState(taskState).build();
driver.sendStatusUpdate(status);
LOGGER.debug("Sent status " + taskState + " for taskId " + taskId);
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/config/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/myriad-scheduler/config/checkstyle/checkstyle.xml b/myriad-scheduler/config/checkstyle/checkstyle.xml
index 0ba6803..6eb5178 100644
--- a/myriad-scheduler/config/checkstyle/checkstyle.xml
+++ b/myriad-scheduler/config/checkstyle/checkstyle.xml
@@ -315,5 +315,11 @@ page at http://checkstyle.sourceforge.net/config.html -->
<property name="severity" value="warning"/>
</module>
+ <module name="Indentation">
+ <property name="basicOffset" value="2"/>
+ <property name="braceAdjustment" value="0"/>
+ <property name="caseIndent" value="2"/>
+ <property name="throwsIndent" value="2"/>
+ </module>
</module>
</module>
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/DisruptorManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/DisruptorManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/DisruptorManager.java
index a38b183..e3fb399 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/DisruptorManager.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/DisruptorManager.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,129 +40,109 @@ import java.util.concurrent.Executors;
* Here it is used to abstract incoming events.
*/
public class DisruptorManager {
- private ExecutorService disruptorExecutors;
-
- private static final int DEFAULT_SMALL_RINGBUFFER_SIZE = 64;
- private static final int DEFAULT_LARGE_RINGBUFFER_SIZE = 1024;
-
- private Disruptor<RegisteredEvent> registeredEventDisruptor;
- private Disruptor<ReRegisteredEvent> reRegisteredEventDisruptor;
- private Disruptor<ResourceOffersEvent> resourceOffersEventDisruptor;
- private Disruptor<OfferRescindedEvent> offerRescindedEventDisruptor;
- private Disruptor<StatusUpdateEvent> statusUpdateEventDisruptor;
- private Disruptor<FrameworkMessageEvent> frameworkMessageEventDisruptor;
- private Disruptor<DisconnectedEvent> disconnectedEventDisruptor;
- private Disruptor<SlaveLostEvent> slaveLostEventDisruptor;
- private Disruptor<ExecutorLostEvent> executorLostEventDisruptor;
- private Disruptor<ErrorEvent> errorEventDisruptor;
-
- @SuppressWarnings("unchecked")
- public void init(Injector injector) {
- this.disruptorExecutors = Executors.newCachedThreadPool();
-
- // todo: (kensipe) need to make ringsize configurable (overriding the defaults)
-
-
- this.registeredEventDisruptor = new Disruptor<>(
- new RegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors);
- this.registeredEventDisruptor.handleEventsWith(injector
- .getInstance(RegisteredEventHandler.class));
- this.registeredEventDisruptor.start();
-
- this.reRegisteredEventDisruptor = new Disruptor<>(
- new ReRegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors);
- this.reRegisteredEventDisruptor.handleEventsWith(injector
- .getInstance(ReRegisteredEventHandler.class));
- this.reRegisteredEventDisruptor.start();
-
-
- this.resourceOffersEventDisruptor = new Disruptor<>(
- new ResourceOffersEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
- this.resourceOffersEventDisruptor.handleEventsWith(injector
- .getInstance(ResourceOffersEventHandler.class));
- this.resourceOffersEventDisruptor.start();
-
- this.offerRescindedEventDisruptor = new Disruptor<>(
- new OfferRescindedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
- this.offerRescindedEventDisruptor.handleEventsWith(injector
- .getInstance(OfferRescindedEventHandler.class));
- this.offerRescindedEventDisruptor.start();
-
- this.statusUpdateEventDisruptor = new Disruptor<>(
- new StatusUpdateEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
- this.statusUpdateEventDisruptor.handleEventsWith(injector
- .getInstance(StatusUpdateEventHandler.class));
- this.statusUpdateEventDisruptor.start();
-
- this.frameworkMessageEventDisruptor = new Disruptor<>(
- new FrameworkMessageEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
- this.frameworkMessageEventDisruptor.handleEventsWith(injector
- .getInstance(FrameworkMessageEventHandler.class));
- this.frameworkMessageEventDisruptor.start();
-
- this.disconnectedEventDisruptor = new Disruptor<>(
- new DisconnectedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
- this.disconnectedEventDisruptor.handleEventsWith(injector
- .getInstance(DisconnectedEventHandler.class));
- this.disconnectedEventDisruptor.start();
-
- this.slaveLostEventDisruptor = new Disruptor<>(
- new SlaveLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
- this.slaveLostEventDisruptor.handleEventsWith(injector
- .getInstance(SlaveLostEventHandler.class));
- this.slaveLostEventDisruptor.start();
-
- this.executorLostEventDisruptor = new Disruptor<>(
- new ExecutorLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
- this.executorLostEventDisruptor.handleEventsWith(injector
- .getInstance(ExecutorLostEventHandler.class));
- this.executorLostEventDisruptor.start();
-
- this.errorEventDisruptor = new Disruptor<>(new ErrorEventFactory(),
- DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
- this.errorEventDisruptor.handleEventsWith(injector
- .getInstance(ErrorEventHandler.class));
- this.errorEventDisruptor.start();
- }
-
- public Disruptor<RegisteredEvent> getRegisteredEventDisruptor() {
- return registeredEventDisruptor;
- }
-
- public Disruptor<ReRegisteredEvent> getReRegisteredEventDisruptor() {
- return reRegisteredEventDisruptor;
- }
-
- public Disruptor<ResourceOffersEvent> getResourceOffersEventDisruptor() {
- return resourceOffersEventDisruptor;
- }
-
- public Disruptor<OfferRescindedEvent> getOfferRescindedEventDisruptor() {
- return offerRescindedEventDisruptor;
- }
-
- public Disruptor<StatusUpdateEvent> getStatusUpdateEventDisruptor() {
- return statusUpdateEventDisruptor;
- }
-
- public Disruptor<FrameworkMessageEvent> getFrameworkMessageEventDisruptor() {
- return frameworkMessageEventDisruptor;
- }
-
- public Disruptor<DisconnectedEvent> getDisconnectedEventDisruptor() {
- return disconnectedEventDisruptor;
- }
-
- public Disruptor<SlaveLostEvent> getSlaveLostEventDisruptor() {
- return slaveLostEventDisruptor;
- }
-
- public Disruptor<ExecutorLostEvent> getExecutorLostEventDisruptor() {
- return executorLostEventDisruptor;
- }
-
- public Disruptor<ErrorEvent> getErrorEventDisruptor() {
- return errorEventDisruptor;
- }
+ private ExecutorService disruptorExecutors;
+
+ private static final int DEFAULT_SMALL_RINGBUFFER_SIZE = 64;
+ private static final int DEFAULT_LARGE_RINGBUFFER_SIZE = 1024;
+
+ private Disruptor<RegisteredEvent> registeredEventDisruptor;
+ private Disruptor<ReRegisteredEvent> reRegisteredEventDisruptor;
+ private Disruptor<ResourceOffersEvent> resourceOffersEventDisruptor;
+ private Disruptor<OfferRescindedEvent> offerRescindedEventDisruptor;
+ private Disruptor<StatusUpdateEvent> statusUpdateEventDisruptor;
+ private Disruptor<FrameworkMessageEvent> frameworkMessageEventDisruptor;
+ private Disruptor<DisconnectedEvent> disconnectedEventDisruptor;
+ private Disruptor<SlaveLostEvent> slaveLostEventDisruptor;
+ private Disruptor<ExecutorLostEvent> executorLostEventDisruptor;
+ private Disruptor<ErrorEvent> errorEventDisruptor;
+
+ @SuppressWarnings("unchecked")
+ public void init(Injector injector) {
+ this.disruptorExecutors = Executors.newCachedThreadPool();
+
+ // todo: (kensipe) need to make ringsize configurable (overriding the defaults)
+
+
+ this.registeredEventDisruptor = new Disruptor<>(new RegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors);
+ this.registeredEventDisruptor.handleEventsWith(injector.getInstance(RegisteredEventHandler.class));
+ this.registeredEventDisruptor.start();
+
+ this.reRegisteredEventDisruptor = new Disruptor<>(new ReRegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors);
+ this.reRegisteredEventDisruptor.handleEventsWith(injector.getInstance(ReRegisteredEventHandler.class));
+ this.reRegisteredEventDisruptor.start();
+
+
+ this.resourceOffersEventDisruptor = new Disruptor<>(new ResourceOffersEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+ this.resourceOffersEventDisruptor.handleEventsWith(injector.getInstance(ResourceOffersEventHandler.class));
+ this.resourceOffersEventDisruptor.start();
+
+ this.offerRescindedEventDisruptor = new Disruptor<>(new OfferRescindedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+ this.offerRescindedEventDisruptor.handleEventsWith(injector.getInstance(OfferRescindedEventHandler.class));
+ this.offerRescindedEventDisruptor.start();
+
+ this.statusUpdateEventDisruptor = new Disruptor<>(new StatusUpdateEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+ this.statusUpdateEventDisruptor.handleEventsWith(injector.getInstance(StatusUpdateEventHandler.class));
+ this.statusUpdateEventDisruptor.start();
+
+ this.frameworkMessageEventDisruptor = new Disruptor<>(new FrameworkMessageEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+ this.frameworkMessageEventDisruptor.handleEventsWith(injector.getInstance(FrameworkMessageEventHandler.class));
+ this.frameworkMessageEventDisruptor.start();
+
+ this.disconnectedEventDisruptor = new Disruptor<>(new DisconnectedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+ this.disconnectedEventDisruptor.handleEventsWith(injector.getInstance(DisconnectedEventHandler.class));
+ this.disconnectedEventDisruptor.start();
+
+ this.slaveLostEventDisruptor = new Disruptor<>(new SlaveLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+ this.slaveLostEventDisruptor.handleEventsWith(injector.getInstance(SlaveLostEventHandler.class));
+ this.slaveLostEventDisruptor.start();
+
+ this.executorLostEventDisruptor = new Disruptor<>(new ExecutorLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+ this.executorLostEventDisruptor.handleEventsWith(injector.getInstance(ExecutorLostEventHandler.class));
+ this.executorLostEventDisruptor.start();
+
+ this.errorEventDisruptor = new Disruptor<>(new ErrorEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors);
+ this.errorEventDisruptor.handleEventsWith(injector.getInstance(ErrorEventHandler.class));
+ this.errorEventDisruptor.start();
+ }
+
+ public Disruptor<RegisteredEvent> getRegisteredEventDisruptor() {
+ return registeredEventDisruptor;
+ }
+
+ public Disruptor<ReRegisteredEvent> getReRegisteredEventDisruptor() {
+ return reRegisteredEventDisruptor;
+ }
+
+ public Disruptor<ResourceOffersEvent> getResourceOffersEventDisruptor() {
+ return resourceOffersEventDisruptor;
+ }
+
+ public Disruptor<OfferRescindedEvent> getOfferRescindedEventDisruptor() {
+ return offerRescindedEventDisruptor;
+ }
+
+ public Disruptor<StatusUpdateEvent> getStatusUpdateEventDisruptor() {
+ return statusUpdateEventDisruptor;
+ }
+
+ public Disruptor<FrameworkMessageEvent> getFrameworkMessageEventDisruptor() {
+ return frameworkMessageEventDisruptor;
+ }
+
+ public Disruptor<DisconnectedEvent> getDisconnectedEventDisruptor() {
+ return disconnectedEventDisruptor;
+ }
+
+ public Disruptor<SlaveLostEvent> getSlaveLostEventDisruptor() {
+ return slaveLostEventDisruptor;
+ }
+
+ public Disruptor<ExecutorLostEvent> getExecutorLostEventDisruptor() {
+ return executorLostEventDisruptor;
+ }
+
+ public Disruptor<ErrorEvent> getErrorEventDisruptor() {
+ return errorEventDisruptor;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
index a0ad9cb..8865640 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -66,256 +66,234 @@ import java.util.HashSet;
/**
* Main entry point for myriad scheduler
- *
*/
public class Main {
- private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
- private MyriadWebServer webServer;
- private ScheduledExecutorService terminatorService;
+ private MyriadWebServer webServer;
+ private ScheduledExecutorService terminatorService;
- private ScheduledExecutorService rebalancerService;
- private HealthCheckRegistry healthCheckRegistry;
+ private ScheduledExecutorService rebalancerService;
+ private HealthCheckRegistry healthCheckRegistry;
- private static Injector injector;
+ private static Injector injector;
- public static void initialize(Configuration hadoopConf,
- AbstractYarnScheduler yarnScheduler,
- RMContext rmContext,
- InterceptorRegistry registry) throws Exception {
- ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- MyriadConfiguration cfg = mapper.readValue(
- Thread.currentThread().getContextClassLoader().getResource("myriad-config-default.yml"),
- MyriadConfiguration.class);
+ public static void initialize(Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, InterceptorRegistry registry) throws Exception {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ MyriadConfiguration cfg = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-default.yml"), MyriadConfiguration.class);
- MyriadModule myriadModule = new MyriadModule(cfg, hadoopConf, yarnScheduler, rmContext, registry);
- MesosModule mesosModule = new MesosModule();
- injector = Guice.createInjector(
- myriadModule, mesosModule,
- new WebAppGuiceModule());
+ MyriadModule myriadModule = new MyriadModule(cfg, hadoopConf, yarnScheduler, rmContext, registry);
+ MesosModule mesosModule = new MesosModule();
+ injector = Guice.createInjector(myriadModule, mesosModule, new WebAppGuiceModule());
- new Main().run(cfg);
- }
+ new Main().run(cfg);
+ }
+
+ // TODO (Kannan Rajah) Hack to get injector in unit test.
+ public static Injector getInjector() {
+ return injector;
+ }
- // TODO (Kannan Rajah) Hack to get injector in unit test.
- public static Injector getInjector() {
- return injector;
+ public void run(MyriadConfiguration cfg) throws Exception {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Bindings: " + injector.getAllBindings());
}
- public void run(MyriadConfiguration cfg) throws Exception {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Bindings: " + injector.getAllBindings());
+ JmxReporter.forRegistry(new MetricRegistry()).build().start();
+
+ initWebApp(injector);
+ initHealthChecks(injector);
+ initProfiles(injector);
+ validateNMInstances(injector);
+ initServiceConfigurations(cfg, injector);
+ initDisruptors(injector);
+
+ initRebalancerService(cfg, injector);
+ initTerminatorService(injector);
+ startMesosDriver(injector);
+ startNMInstances(injector);
+ startJavaBasedTaskInstance(injector);
+ }
+
+
+ private void startMesosDriver(Injector injector) {
+ LOGGER.info("starting mesosDriver..");
+ injector.getInstance(MyriadDriverManager.class).startDriver();
+ LOGGER.info("started mesosDriver..");
+ }
+
+ /**
+ * Brings up the embedded jetty webserver for serving REST APIs.
+ *
+ * @param injector
+ */
+ private void initWebApp(Injector injector) throws Exception {
+ webServer = injector.getInstance(MyriadWebServer.class);
+ webServer.start();
+ }
+
+ /**
+ * Initializes health checks.
+ *
+ * @param injector
+ */
+ private void initHealthChecks(Injector injector) {
+ LOGGER.info("Initializing HealthChecks");
+ healthCheckRegistry = new HealthCheckRegistry();
+ healthCheckRegistry.register(MesosMasterHealthCheck.NAME, injector.getInstance(MesosMasterHealthCheck.class));
+ healthCheckRegistry.register(ZookeeperHealthCheck.NAME, injector.getInstance(ZookeeperHealthCheck.class));
+ healthCheckRegistry.register(MesosDriverHealthCheck.NAME, injector.getInstance(MesosDriverHealthCheck.class));
+ }
+
+ private void initProfiles(Injector injector) {
+ LOGGER.info("Initializing Profiles");
+ ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
+ TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class);
+ taskConstraintsManager.addTaskConstraints(NodeManagerConfiguration.NM_TASK_PREFIX, new TaskFactory.NMTaskConstraints());
+ Map<String, Map<String, String>> profiles = injector.getInstance(MyriadConfiguration.class).getProfiles();
+ TaskUtils taskUtils = injector.getInstance(TaskUtils.class);
+ if (MapUtils.isNotEmpty(profiles)) {
+ for (Map.Entry<String, Map<String, String>> profile : profiles.entrySet()) {
+ Map<String, String> profileResourceMap = profile.getValue();
+ if (MapUtils.isNotEmpty(profiles) && profileResourceMap.containsKey("cpu") && profileResourceMap.containsKey("mem")) {
+ Long cpu = Long.parseLong(profileResourceMap.get("cpu"));
+ Long mem = Long.parseLong(profileResourceMap.get("mem"));
+
+ ServiceResourceProfile serviceProfile = new ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem), taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory());
+ serviceProfile.setExecutorCpu(taskUtils.getExecutorCpus());
+ serviceProfile.setExecutorMemory(taskUtils.getExecutorMemory());
+
+ profileManager.add(serviceProfile);
+ } else {
+ LOGGER.error("Invalid definition for profile: " + profile.getKey());
}
-
- JmxReporter.forRegistry(new MetricRegistry()).build().start();
-
- initWebApp(injector);
- initHealthChecks(injector);
- initProfiles(injector);
- validateNMInstances(injector);
- initServiceConfigurations(cfg, injector);
- initDisruptors(injector);
-
- initRebalancerService(cfg, injector);
- initTerminatorService(injector);
- startMesosDriver(injector);
- startNMInstances(injector);
- startJavaBasedTaskInstance(injector);
+ }
}
-
-
- private void startMesosDriver(Injector injector) {
- LOGGER.info("starting mesosDriver..");
- injector.getInstance(MyriadDriverManager.class).startDriver();
- LOGGER.info("started mesosDriver..");
+ }
+
+ private void validateNMInstances(Injector injector) {
+ LOGGER.info("Validating nmInstances..");
+ Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances();
+ ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
+
+ long maxCpu = Long.MIN_VALUE;
+ long maxMem = Long.MIN_VALUE;
+ for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) {
+ String profile = entry.getKey();
+ ServiceResourceProfile nodeManager = profileManager.get(profile);
+ if (nodeManager == null) {
+ throw new RuntimeException("Invalid profile name '" + profile + "' specified in 'nmInstances'");
+ }
+ if (entry.getValue() > 0) {
+ if (nodeManager.getCpus() > maxCpu) { // find the profile with largest number of cpus
+ maxCpu = nodeManager.getCpus().longValue();
+ maxMem = nodeManager.getMemory().longValue(); // use the memory from the same profile
+ }
+ }
}
-
- /**
- * Brings up the embedded jetty webserver for serving REST APIs.
- *
- * @param injector
- */
- private void initWebApp(Injector injector) throws Exception {
- webServer = injector.getInstance(MyriadWebServer.class);
- webServer.start();
+ if (maxCpu <= 0 || maxMem <= 0) {
+ throw new RuntimeException("Please configure 'nmInstances' with at least one instance/profile " + "with non-zero cpu/mem resources.");
}
-
- /**
- * Initializes health checks.
- *
- * @param injector
- */
- private void initHealthChecks(Injector injector) {
- LOGGER.info("Initializing HealthChecks");
- healthCheckRegistry = new HealthCheckRegistry();
- healthCheckRegistry.register(MesosMasterHealthCheck.NAME,
- injector.getInstance(MesosMasterHealthCheck.class));
- healthCheckRegistry.register(ZookeeperHealthCheck.NAME,
- injector.getInstance(ZookeeperHealthCheck.class));
- healthCheckRegistry.register(MesosDriverHealthCheck.NAME,
- injector.getInstance(MesosDriverHealthCheck.class));
+ }
+
+ private void startNMInstances(Injector injector) {
+ Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances();
+ MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class);
+ ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
+ SchedulerState schedulerState = injector.getInstance(SchedulerState.class);
+
+ Set<NodeTask> launchedNMTasks = new HashSet<>();
+ launchedNMTasks.addAll(schedulerState.getPendingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
+ if (!launchedNMTasks.isEmpty()) {
+ LOGGER.info("{} NM(s) in pending state. Not launching additional NMs", launchedNMTasks.size());
+ return;
}
- private void initProfiles(Injector injector) {
- LOGGER.info("Initializing Profiles");
- ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
- TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class);
- taskConstraintsManager.addTaskConstraints(NodeManagerConfiguration.NM_TASK_PREFIX, new TaskFactory.NMTaskConstraints());
- Map<String, Map<String, String>> profiles = injector.getInstance(MyriadConfiguration.class).getProfiles();
- TaskUtils taskUtils = injector.getInstance(TaskUtils.class);
- if (MapUtils.isNotEmpty(profiles)) {
- for (Map.Entry<String, Map<String, String>> profile : profiles.entrySet()) {
- Map<String, String> profileResourceMap = profile.getValue();
- if (MapUtils.isNotEmpty(profiles)
- && profileResourceMap.containsKey("cpu")
- && profileResourceMap.containsKey("mem")) {
- Long cpu = Long.parseLong(profileResourceMap.get("cpu"));
- Long mem = Long.parseLong(profileResourceMap.get("mem"));
-
- ServiceResourceProfile serviceProfile = new ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem),
- taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory());
- serviceProfile.setExecutorCpu(taskUtils.getExecutorCpus());
- serviceProfile.setExecutorMemory(taskUtils.getExecutorMemory());
-
- profileManager.add(serviceProfile);
- } else {
- LOGGER.error("Invalid definition for profile: " + profile.getKey());
- }
- }
- }
+ launchedNMTasks.addAll(schedulerState.getStagingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
+ if (!launchedNMTasks.isEmpty()) {
+ LOGGER.info("{} NM(s) in staging state. Not launching additional NMs", launchedNMTasks.size());
+ return;
}
- private void validateNMInstances(Injector injector) {
- LOGGER.info("Validating nmInstances..");
- Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances();
- ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
-
- long maxCpu = Long.MIN_VALUE;
- long maxMem = Long.MIN_VALUE;
- for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) {
- String profile = entry.getKey();
- ServiceResourceProfile nodeManager = profileManager.get(profile);
- if (nodeManager == null) {
- throw new RuntimeException("Invalid profile name '" + profile + "' specified in 'nmInstances'");
- }
- if (entry.getValue() > 0) {
- if (nodeManager.getCpus() > maxCpu) { // find the profile with largest number of cpus
- maxCpu = nodeManager.getCpus().longValue();
- maxMem = nodeManager.getMemory().longValue(); // use the memory from the same profile
- }
- }
- }
- if (maxCpu <= 0 || maxMem <= 0) {
- throw new RuntimeException("Please configure 'nmInstances' with at least one instance/profile "
- + "with non-zero cpu/mem resources.");
- }
+ launchedNMTasks.addAll(schedulerState.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
+ if (!launchedNMTasks.isEmpty()) {
+ LOGGER.info("{} NM(s) in active state. Not launching additional NMs", launchedNMTasks.size());
+ return;
}
- private void startNMInstances(Injector injector) {
- Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances();
- MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class);
- ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
- SchedulerState schedulerState = injector.getInstance(SchedulerState.class);
-
- Set<NodeTask> launchedNMTasks = new HashSet<>();
- launchedNMTasks.addAll(
- schedulerState.getPendingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
- if (!launchedNMTasks.isEmpty()) {
- LOGGER.info("{} NM(s) in pending state. Not launching additional NMs", launchedNMTasks.size());
- return;
- }
-
- launchedNMTasks.addAll(
- schedulerState.getStagingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
- if (!launchedNMTasks.isEmpty()) {
- LOGGER.info("{} NM(s) in staging state. Not launching additional NMs", launchedNMTasks.size());
- return;
- }
-
- launchedNMTasks.addAll(
- schedulerState.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
- if (!launchedNMTasks.isEmpty()) {
- LOGGER.info("{} NM(s) in active state. Not launching additional NMs", launchedNMTasks.size());
- return;
- }
-
- for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) {
- LOGGER.info("Launching {} NM(s) with profile {}", entry.getValue(), entry.getKey());
- myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), entry.getValue(), null);
- }
+ for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) {
+ LOGGER.info("Launching {} NM(s) with profile {}", entry.getValue(), entry.getKey());
+ myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), entry.getValue(), null);
}
-
- /**
- * Create ServiceProfile for any configured service
- * @param cfg
- * @param injector
- */
- private void initServiceConfigurations(MyriadConfiguration cfg, Injector injector) {
- LOGGER.info("Initializing initServiceConfigurations");
- ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
- TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class);
-
- Map<String, ServiceConfiguration> servicesConfigs =
- injector.getInstance(MyriadConfiguration.class).getServiceConfigurations();
- if (servicesConfigs != null) {
- for (Map.Entry<String, ServiceConfiguration> entry : servicesConfigs.entrySet()) {
- final String taskPrefix = entry.getKey();
- ServiceConfiguration config = entry.getValue();
- final Double cpu = config.getCpus().or(ServiceConfiguration.DEFAULT_CPU);
- final Double mem = config.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY);
-
- profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem));
- taskConstraintsManager.addTaskConstraints(taskPrefix, new ServiceTaskConstraints(cfg, taskPrefix));
- }
+ }
+
+ /**
+ * Create ServiceProfile for any configured service
+ *
+ * @param cfg
+ * @param injector
+ */
+ private void initServiceConfigurations(MyriadConfiguration cfg, Injector injector) {
+ LOGGER.info("Initializing initServiceConfigurations");
+ ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class);
+ TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class);
+
+ Map<String, ServiceConfiguration> servicesConfigs = injector.getInstance(MyriadConfiguration.class).getServiceConfigurations();
+ if (servicesConfigs != null) {
+ for (Map.Entry<String, ServiceConfiguration> entry : servicesConfigs.entrySet()) {
+ final String taskPrefix = entry.getKey();
+ ServiceConfiguration config = entry.getValue();
+ final Double cpu = config.getCpus().or(ServiceConfiguration.DEFAULT_CPU);
+ final Double mem = config.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY);
+
+ profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem));
+ taskConstraintsManager.addTaskConstraints(taskPrefix, new ServiceTaskConstraints(cfg, taskPrefix));
}
}
-
- private void initTerminatorService(Injector injector) {
- LOGGER.info("Initializing Terminator");
- terminatorService = Executors.newScheduledThreadPool(1);
- final int initialDelay = 100;
- final int period = 2000;
- terminatorService.scheduleAtFixedRate(
- injector.getInstance(TaskTerminator.class), initialDelay, period, TimeUnit.MILLISECONDS);
+ }
+
+ private void initTerminatorService(Injector injector) {
+ LOGGER.info("Initializing Terminator");
+ terminatorService = Executors.newScheduledThreadPool(1);
+ final int initialDelay = 100;
+ final int period = 2000;
+ terminatorService.scheduleAtFixedRate(injector.getInstance(TaskTerminator.class), initialDelay, period, TimeUnit.MILLISECONDS);
+ }
+
+ private void initRebalancerService(MyriadConfiguration cfg, Injector injector) {
+ if (cfg.isRebalancer()) {
+ LOGGER.info("Initializing Rebalancer");
+ rebalancerService = Executors.newScheduledThreadPool(1);
+ final int initialDelay = 100;
+ final int period = 5000;
+ rebalancerService.scheduleAtFixedRate(injector.getInstance(Rebalancer.class), initialDelay, period, TimeUnit.MILLISECONDS);
+ } else {
+ LOGGER.info("Rebalancer is not turned on");
}
-
- private void initRebalancerService(MyriadConfiguration cfg,
- Injector injector) {
- if (cfg.isRebalancer()) {
- LOGGER.info("Initializing Rebalancer");
- rebalancerService = Executors.newScheduledThreadPool(1);
- final int initialDelay = 100;
- final int period = 5000;
- rebalancerService.scheduleAtFixedRate(
- injector.getInstance(Rebalancer.class), initialDelay, period, TimeUnit.MILLISECONDS);
- } else {
- LOGGER.info("Rebalancer is not turned on");
+ }
+
+ private void initDisruptors(Injector injector) {
+ LOGGER.info("Initializing Disruptors");
+ DisruptorManager disruptorManager = injector.getInstance(DisruptorManager.class);
+ disruptorManager.init(injector);
+ }
+
+ /**
+ * Start tasks for configured services
+ *
+ * @param injector
+ */
+ private void startJavaBasedTaskInstance(Injector injector) {
+ Map<String, ServiceConfiguration> auxServicesConfigs = injector.getInstance(MyriadConfiguration.class).getServiceConfigurations();
+ if (auxServicesConfigs != null) {
+ MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class);
+ for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
+ try {
+ myriadOperations.flexUpAService(entry.getValue().getMaxInstances().or(1), entry.getKey());
+ } catch (MyriadBadConfigurationException e) {
+ LOGGER.warn("Exception while trying to flexup service: {}", entry.getKey(), e);
}
- }
-
- private void initDisruptors(Injector injector) {
- LOGGER.info("Initializing Disruptors");
- DisruptorManager disruptorManager = injector
- .getInstance(DisruptorManager.class);
- disruptorManager.init(injector);
- }
-
- /**
- * Start tasks for configured services
- * @param injector
- */
- private void startJavaBasedTaskInstance(Injector injector) {
- Map<String, ServiceConfiguration> auxServicesConfigs =
- injector.getInstance(MyriadConfiguration.class).getServiceConfigurations();
- if (auxServicesConfigs != null) {
- MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class);
- for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
- try {
- myriadOperations.flexUpAService(entry.getValue().getMaxInstances().or(1), entry.getKey());
- } catch (MyriadBadConfigurationException e) {
- LOGGER.warn("Exception while trying to flexup service: {}", entry.getKey(), e);
- }
- }
}
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java b/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java
index dc81276..d8f28c5 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -49,8 +49,7 @@ import com.google.protobuf.ByteString;
* Guice Module for Mesos objects.
*/
public class MesosModule extends AbstractModule {
- private static final Logger LOGGER = LoggerFactory.getLogger(
- MesosModule.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(MesosModule.class);
public MesosModule() {
}
@@ -62,15 +61,9 @@ public class MesosModule extends AbstractModule {
@Provides
@Singleton
- SchedulerDriver providesSchedulerDriver(
- MyriadScheduler scheduler,
- MyriadConfiguration cfg,
- SchedulerState schedulerState) {
+ SchedulerDriver providesSchedulerDriver(MyriadScheduler scheduler, MyriadConfiguration cfg, SchedulerState schedulerState) {
- Builder frameworkInfoBuilder = FrameworkInfo.newBuilder().setUser("")
- .setName(cfg.getFrameworkName())
- .setCheckpoint(cfg.isCheckpoint())
- .setFailoverTimeout(cfg.getFrameworkFailoverTimeout());
+ Builder frameworkInfoBuilder = FrameworkInfo.newBuilder().setUser("").setName(cfg.getFrameworkName()).setCheckpoint(cfg.isCheckpoint()).setFailoverTimeout(cfg.getFrameworkFailoverTimeout());
if (StringUtils.isNotEmpty(cfg.getFrameworkRole())) {
frameworkInfoBuilder.setRole(cfg.getFrameworkRole());
@@ -90,9 +83,8 @@ public class MesosModule extends AbstractModule {
Credential.Builder credentialBuilder = Credential.newBuilder();
credentialBuilder.setPrincipal(mesosAuthenticationPrincipal);
if (StringUtils.isNotEmpty(mesosAuthenticationSecretFilename)) {
- try {
- credentialBuilder.setSecret(ByteString.readFrom(
- new FileInputStream(mesosAuthenticationSecretFilename)));
+ try {
+ credentialBuilder.setSecret(ByteString.readFrom(new FileInputStream(mesosAuthenticationSecretFilename)));
} catch (FileNotFoundException ex) {
LOGGER.error("Mesos authentication secret file was not found", ex);
throw new RuntimeException(ex);
@@ -101,21 +93,15 @@ public class MesosModule extends AbstractModule {
throw new RuntimeException(ex);
}
}
- return new MesosSchedulerDriver(scheduler, frameworkInfoBuilder.build(),
- cfg.getMesosMaster(), credentialBuilder.build());
+ return new MesosSchedulerDriver(scheduler, frameworkInfoBuilder.build(), cfg.getMesosMaster(), credentialBuilder.build());
} else {
- return new MesosSchedulerDriver(scheduler,
- frameworkInfoBuilder.build(), cfg.getMesosMaster());
+ return new MesosSchedulerDriver(scheduler, frameworkInfoBuilder.build(), cfg.getMesosMaster());
}
}
@Provides
@Singleton
State providesStateStore(MyriadConfiguration cfg) {
- return new ZooKeeperState(
- cfg.getZkServers(),
- cfg.getZkTimeout(),
- TimeUnit.MILLISECONDS,
- "/myriad/" + cfg.getFrameworkName());
+ return new ZooKeeperState(cfg.getZkServers(), cfg.getZkTimeout(), TimeUnit.MILLISECONDS, "/myriad/" + cfg.getFrameworkName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
index 3632334..b28164c 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -62,111 +62,104 @@ import java.util.Map;
* Guice Module for Myriad
*/
public class MyriadModule extends AbstractModule {
- private static final Logger LOGGER = LoggerFactory.getLogger(MyriadModule.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(MyriadModule.class);
- private MyriadConfiguration cfg;
- private Configuration hadoopConf;
- private AbstractYarnScheduler yarnScheduler;
- private final RMContext rmContext;
- private InterceptorRegistry interceptorRegistry;
+ private MyriadConfiguration cfg;
+ private Configuration hadoopConf;
+ private AbstractYarnScheduler yarnScheduler;
+ private final RMContext rmContext;
+ private InterceptorRegistry interceptorRegistry;
- public MyriadModule(MyriadConfiguration cfg,
- Configuration hadoopConf,
- AbstractYarnScheduler yarnScheduler,
- RMContext rmContext,
- InterceptorRegistry interceptorRegistry) {
- this.cfg = cfg;
- this.hadoopConf = hadoopConf;
- this.yarnScheduler = yarnScheduler;
- this.rmContext = rmContext;
- this.interceptorRegistry = interceptorRegistry;
- }
+ public MyriadModule(MyriadConfiguration cfg, Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, InterceptorRegistry interceptorRegistry) {
+ this.cfg = cfg;
+ this.hadoopConf = hadoopConf;
+ this.yarnScheduler = yarnScheduler;
+ this.rmContext = rmContext;
+ this.interceptorRegistry = interceptorRegistry;
+ }
- @Override
- protected void configure() {
- LOGGER.debug("Configuring guice");
- bind(MyriadConfiguration.class).toInstance(cfg);
- bind(Configuration.class).toInstance(hadoopConf);
- bind(RMContext.class).toInstance(rmContext);
- bind(AbstractYarnScheduler.class).toInstance(yarnScheduler);
- bind(InterceptorRegistry.class).toInstance(interceptorRegistry);
- bind(MyriadDriverManager.class).in(Scopes.SINGLETON);
- bind(MyriadScheduler.class).in(Scopes.SINGLETON);
- bind(ServiceProfileManager.class).in(Scopes.SINGLETON);
- bind(DisruptorManager.class).in(Scopes.SINGLETON);
- bind(ReconcileService.class).in(Scopes.SINGLETON);
- bind(HttpConnectorProvider.class).in(Scopes.SINGLETON);
- bind(TaskConstraintsManager.class).in(Scopes.SINGLETON);
- // add special binding between TaskFactory and NMTaskFactoryImpl to ease up
- // usage of TaskFactory
- bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactoryImpl.class);
- bind(YarnNodeCapacityManager.class).in(Scopes.SINGLETON);
- bind(NodeStore.class).in(Scopes.SINGLETON);
- bind(OfferLifecycleManager.class).in(Scopes.SINGLETON);
- bind(NMHeartBeatHandler.class).asEagerSingleton();
+ @Override
+ protected void configure() {
+ LOGGER.debug("Configuring guice");
+ bind(MyriadConfiguration.class).toInstance(cfg);
+ bind(Configuration.class).toInstance(hadoopConf);
+ bind(RMContext.class).toInstance(rmContext);
+ bind(AbstractYarnScheduler.class).toInstance(yarnScheduler);
+ bind(InterceptorRegistry.class).toInstance(interceptorRegistry);
+ bind(MyriadDriverManager.class).in(Scopes.SINGLETON);
+ bind(MyriadScheduler.class).in(Scopes.SINGLETON);
+ bind(ServiceProfileManager.class).in(Scopes.SINGLETON);
+ bind(DisruptorManager.class).in(Scopes.SINGLETON);
+ bind(ReconcileService.class).in(Scopes.SINGLETON);
+ bind(HttpConnectorProvider.class).in(Scopes.SINGLETON);
+ bind(TaskConstraintsManager.class).in(Scopes.SINGLETON);
+ // add special binding between TaskFactory and NMTaskFactoryImpl to ease up
+ // usage of TaskFactory
+ bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactoryImpl.class);
+ bind(YarnNodeCapacityManager.class).in(Scopes.SINGLETON);
+ bind(NodeStore.class).in(Scopes.SINGLETON);
+ bind(OfferLifecycleManager.class).in(Scopes.SINGLETON);
+ bind(NMHeartBeatHandler.class).asEagerSingleton();
- MapBinder<String, TaskFactory> mapBinder
- = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
- mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON);
- Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
- if (auxServicesConfigs != null) {
- for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
- String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull();
- if (taskFactoryClass != null) {
- try {
- Class<? extends TaskFactory> implClass = (Class<? extends TaskFactory>) Class.forName(taskFactoryClass);
- mapBinder.addBinding(entry.getKey()).to(implClass).in(Scopes.SINGLETON);
- } catch (ClassNotFoundException e) {
- LOGGER.error("ClassNotFoundException", e);
- }
- } else {
- mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON);
- }
+ MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
+ mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON);
+ Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations();
+ if (auxServicesConfigs != null) {
+ for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) {
+ String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull();
+ if (taskFactoryClass != null) {
+ try {
+ Class<? extends TaskFactory> implClass = (Class<? extends TaskFactory>) Class.forName(taskFactoryClass);
+ mapBinder.addBinding(entry.getKey()).to(implClass).in(Scopes.SINGLETON);
+ } catch (ClassNotFoundException e) {
+ LOGGER.error("ClassNotFoundException", e);
}
+ } else {
+ mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON);
}
- //TODO(Santosh): Should be configurable as well
- bind(NodeScaleDownPolicy.class).to(LeastAMNodesFirstPolicy.class).in(Scopes.SINGLETON);
+ }
}
+ //TODO(Santosh): Should be configurable as well
+ bind(NodeScaleDownPolicy.class).to(LeastAMNodesFirstPolicy.class).in(Scopes.SINGLETON);
+ }
- @Provides
- @Singleton
- SchedulerState providesSchedulerState(MyriadConfiguration cfg) {
- LOGGER.debug("Configuring SchedulerState provider");
- MyriadStateStore myriadStateStore = null;
- if (cfg.isHAEnabled()) {
- myriadStateStore = providesMyriadStateStore();
- if (myriadStateStore == null) {
- throw new RuntimeException("Could not find a state store" +
- " implementation for Myriad. The 'yarn.resourcemanager.store.class'" +
- " property should be set to a class implementing the" +
- " MyriadStateStore interface. For e.g." +
- " org.apache.hadoop.yarn.server.resourcemanager.recovery.MyriadFileSystemRMStateStore");
- }
- }
- return new SchedulerState(myriadStateStore);
+ @Provides
+ @Singleton
+ SchedulerState providesSchedulerState(MyriadConfiguration cfg) {
+ LOGGER.debug("Configuring SchedulerState provider");
+ MyriadStateStore myriadStateStore = null;
+ if (cfg.isHAEnabled()) {
+ myriadStateStore = providesMyriadStateStore();
+ if (myriadStateStore == null) {
+ throw new RuntimeException("Could not find a state store" +
+ " implementation for Myriad. The 'yarn.resourcemanager.store.class'" +
+ " property should be set to a class implementing the" +
+ " MyriadStateStore interface. For e.g." +
+ " org.apache.hadoop.yarn.server.resourcemanager.recovery.MyriadFileSystemRMStateStore");
+ }
}
+ return new SchedulerState(myriadStateStore);
+ }
- private MyriadStateStore providesMyriadStateStore() {
- // TODO (sdaingade) Read the implementation class from yml
- // once multiple implementations are available.
- if (rmContext.getStateStore() instanceof MyriadStateStore) {
- return (MyriadStateStore) rmContext.getStateStore();
- }
- return null;
+ private MyriadStateStore providesMyriadStateStore() {
+ // TODO (sdaingade) Read the implementation class from yml
+ // once multiple implementations are available.
+ if (rmContext.getStateStore() instanceof MyriadStateStore) {
+ return (MyriadStateStore) rmContext.getStateStore();
}
+ return null;
+ }
- @Provides
- @Singleton
- ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) {
- ExecutorCommandLineGenerator cliGenerator = null;
- MyriadExecutorConfiguration myriadExecutorConfiguration =
- cfg.getMyriadExecutorConfiguration();
- if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
- cliGenerator = new DownloadNMExecutorCLGenImpl(cfg,
- myriadExecutorConfiguration.getNodeManagerUri().get());
- } else {
- cliGenerator = new NMExecutorCLGenImpl(cfg);
- }
- return cliGenerator;
- }
+ @Provides
+ @Singleton
+ ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) {
+ ExecutorCommandLineGenerator cliGenerator = null;
+ MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
+ if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+ cliGenerator = new DownloadNMExecutorCLGenImpl(cfg, myriadExecutorConfiguration.getNodeManagerUri().get());
+ } else {
+ cliGenerator = new NMExecutorCLGenImpl(cfg);
+ }
+ return cliGenerator;
+ }
}