You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2017/07/25 18:02:50 UTC
[08/50] [abbrv] hadoop git commit: YARN-6255. Refactor
yarn-native-services framework. Contributed by Jian He
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index f67ea58..4922c2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
@@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
@@ -77,13 +80,12 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.WebAppException;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.RoleKeys;
-import org.apache.slider.api.StatusKeys;
+import org.apache.slider.api.proto.Messages;
import org.apache.slider.api.proto.SliderClusterAPI;
-import org.apache.slider.client.SliderYarnClientImpl;
+import org.apache.slider.api.resource.Application;
import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.params.AbstractActionArgs;
@@ -95,10 +97,7 @@ import org.apache.slider.common.tools.PortScanner;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.common.tools.SliderVersionInfo;
-import org.apache.slider.core.buildutils.InstanceIO;
import org.apache.slider.core.conf.AggregateConf;
-import org.apache.slider.core.conf.ConfTree;
-import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.SliderException;
@@ -109,13 +108,12 @@ import org.apache.slider.core.main.ExitCodeProvider;
import org.apache.slider.core.main.LauncherExitCodes;
import org.apache.slider.core.main.RunService;
import org.apache.slider.core.main.ServiceLauncher;
+import org.apache.slider.core.persist.JsonSerDeser;
import org.apache.slider.core.registry.info.CustomRegistryConstants;
import org.apache.slider.providers.ProviderCompleted;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderService;
import org.apache.slider.providers.SliderProviderFactory;
-import org.apache.slider.providers.slideram.SliderAMClientProvider;
-import org.apache.slider.providers.slideram.SliderAMProviderService;
import org.apache.slider.server.appmaster.actions.ActionHalt;
import org.apache.slider.server.appmaster.actions.ActionRegisterServiceInstance;
import org.apache.slider.server.appmaster.actions.ActionStopSlider;
@@ -136,7 +134,6 @@ import org.apache.slider.server.appmaster.monkey.ChaosKillContainer;
import org.apache.slider.server.appmaster.monkey.ChaosMonkeyService;
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
import org.apache.slider.server.appmaster.operations.AsyncRMOperationHandler;
-import org.apache.slider.server.appmaster.operations.ProviderNotifyingOperationHandler;
import org.apache.slider.server.appmaster.operations.RMOperationHandler;
import org.apache.slider.server.appmaster.rpc.RpcBinder;
import org.apache.slider.server.appmaster.rpc.SliderAMPolicyProvider;
@@ -146,6 +143,7 @@ import org.apache.slider.server.appmaster.security.SecurityConfiguration;
import org.apache.slider.server.appmaster.state.AppState;
import org.apache.slider.server.appmaster.state.AppStateBindingInfo;
import org.apache.slider.server.appmaster.state.ContainerAssignment;
+import org.apache.slider.server.appmaster.state.MostRecentContainerReleaseSelector;
import org.apache.slider.server.appmaster.state.ProviderAppState;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.web.SliderAMWebApp;
@@ -161,18 +159,20 @@ import org.apache.slider.server.services.workflow.ServiceThreadFactory;
import org.apache.slider.server.services.workflow.WorkflowExecutorService;
import org.apache.slider.server.services.workflow.WorkflowRpcService;
import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
+import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -242,8 +242,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private RMOperationHandler rmOperationHandler;
-
- private RMOperationHandler providerRMOperationHandler;
/** Handle to communicate with the Node Manager*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
@@ -252,7 +250,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/**
* Credentials for propagating down to launched containers
*/
- private Credentials containerCredentials;
+ private Credentials containerCredentials = new Credentials();
/**
* Slider IPC: Real service handler
@@ -320,13 +318,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
*/
private final AtomicBoolean initCompleted = new AtomicBoolean(false);
- /**
- * Flag to set if the process exit code was set before shutdown started
- */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private boolean spawnedProcessExitedBeforeShutdownTriggered;
-
-
/** Arguments passed in : raw*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private SliderAMArgs serviceArgs;
@@ -371,7 +362,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
private SliderAMWebApp webApp;
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private InetSocketAddress rpcServiceAddress;
- private SliderAMProviderService sliderAMProvider;
/**
* Executor.
@@ -398,12 +388,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
*/
private boolean securityEnabled;
private ContentCache contentCache;
+ private static final JsonSerDeser<Application> jsonSerDeser =
+ new JsonSerDeser<Application>(Application.class,
+ PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
/**
* resource limits
*/
private Resource maximumResourceCapability;
-
+ private Application application;
/**
* Service Constructor
*/
@@ -586,84 +579,31 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/**
* Create and run the cluster.
- * @param clustername cluster name
+ * @param appName cluster name
* @return exit code
* @throws Throwable on a failure
*/
- private int createAndRunCluster(String clustername) throws Throwable {
-
- //load the cluster description from the cd argument
- String sliderClusterDir = serviceArgs.getSliderClusterURI();
- URI sliderClusterURI = new URI(sliderClusterDir);
- Path clusterDirPath = new Path(sliderClusterURI);
- log.info("Application defined at {}", sliderClusterURI);
+ private int createAndRunCluster(String appName) throws Throwable {
+ Path appDir = new Path((serviceArgs.getAppDefDir()));
SliderFileSystem fs = getClusterFS();
-
- // build up information about the running application -this
- // will be passed down to the cluster status
- MapOperations appInformation = new MapOperations();
-
- AggregateConf instanceDefinition =
- InstanceIO.loadInstanceDefinitionUnresolved(fs, clusterDirPath);
- instanceDefinition.setName(clustername);
-
- log.info("Deploying cluster {}:", instanceDefinition);
-
- // and resolve it
- AggregateConf resolvedInstance = new AggregateConf( instanceDefinition);
- resolvedInstance.resolve();
-
- stateForProviders.setApplicationName(clustername);
-
+ fs.setAppDir(appDir);
+ Path appJson = new Path(appDir, appName + ".json");
+ log.info("Loading application definition from " + appJson);
+ application = jsonSerDeser.load(fs.getFileSystem(), appJson);
+ log.info("Application Json: " + application);
+ stateForProviders.setApplicationName(appName);
Configuration serviceConf = getConfig();
- // extend AM configuration with component resource
- MapOperations amConfiguration = resolvedInstance
- .getAppConfOperations().getComponent(COMPONENT_AM);
- // and patch configuration with prefix
- if (amConfiguration != null) {
- Map<String, String> sliderAppConfKeys = amConfiguration.prefixedWith("slider.");
- for (Map.Entry<String, String> entry : sliderAppConfKeys.entrySet()) {
- String k = entry.getKey();
- String v = entry.getValue();
- boolean exists = serviceConf.get(k) != null;
- log.info("{} {} to {}", (exists ? "Overwriting" : "Setting"), k, v);
- serviceConf.set(k, v);
- }
- }
-
- securityConfiguration = new SecurityConfiguration(serviceConf, resolvedInstance, clustername);
// obtain security state
- securityEnabled = securityConfiguration.isSecurityEnabled();
// set the global security flag for the instance definition
- instanceDefinition.getAppConfOperations().set(KEY_SECURITY_ENABLED, securityEnabled);
-
- // triggers resolution and snapshotting for agent
- appState.setInitialInstanceDefinition(instanceDefinition);
- File confDir = getLocalConfDir();
- if (!confDir.exists() || !confDir.isDirectory()) {
- log.info("Conf dir {} does not exist.", confDir);
- File parentFile = confDir.getParentFile();
- log.info("Parent dir {}:\n{}", parentFile, SliderUtils.listDir(parentFile));
- }
-
//get our provider
- MapOperations globalInternalOptions = getGlobalInternalOptions();
- String providerType = globalInternalOptions.getMandatoryOption(
- InternalKeys.INTERNAL_PROVIDER_NAME);
- log.info("Cluster provider type is {}", providerType);
SliderProviderFactory factory =
- SliderProviderFactory.createSliderProviderFactory(providerType);
+ SliderProviderFactory.createSliderProviderFactory("docker");
providerService = factory.createServerProvider();
// init the provider BUT DO NOT START IT YET
initAndAddService(providerService);
- providerRMOperationHandler = new ProviderNotifyingOperationHandler(providerService);
-
- // create a slider AM provider
- sliderAMProvider = new SliderAMProviderService();
- initAndAddService(sliderAMProvider);
-
+
InetSocketAddress rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(serviceConf);
log.info("RM is at {}", rmSchedulerAddress);
yarnRPC = YarnRPC.create(serviceConf);
@@ -689,10 +629,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
ApplicationId appid = appAttemptID.getApplicationId();
log.info("AM for ID {}", appid.getId());
- appInformation.put(StatusKeys.INFO_AM_CONTAINER_ID, appMasterContainerID.toString());
- appInformation.put(StatusKeys.INFO_AM_APP_ID, appid.toString());
- appInformation.put(StatusKeys.INFO_AM_ATTEMPT_ID, appAttemptID.toString());
-
Map<String, String> envVars;
List<Container> liveContainers;
@@ -731,28 +667,22 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
}
//bring up the Slider RPC service
- buildPortScanner(instanceDefinition);
- startSliderRPCServer(instanceDefinition);
+ buildPortScanner();
+ startSliderRPCServer();
rpcServiceAddress = rpcService.getConnectAddress();
appMasterHostname = rpcServiceAddress.getAddress().getCanonicalHostName();
appMasterRpcPort = rpcServiceAddress.getPort();
appMasterTrackingUrl = null;
log.info("AM Server is listening at {}:{}", appMasterHostname, appMasterRpcPort);
- appInformation.put(StatusKeys.INFO_AM_HOSTNAME, appMasterHostname);
- appInformation.set(StatusKeys.INFO_AM_RPC_PORT, appMasterRpcPort);
log.info("Starting Yarn registry");
registryOperations = startRegistryOperationsService();
log.info(registryOperations.toString());
//build the role map
- List<ProviderRole> providerRoles = new ArrayList<>(providerService.getRoles());
- providerRoles.addAll(SliderAMClientProvider.ROLES);
-
+ List<ProviderRole> providerRoles = Collections.EMPTY_LIST;
// Start up the WebApp and track the URL for it
- MapOperations component = instanceDefinition.getAppConfOperations()
- .getComponent(SliderKeys.COMPONENT_AM);
// Web service endpoints: initialize
WebAppApiImpl webAppApi =
@@ -760,9 +690,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
stateForProviders,
providerService, registryOperations,
metricsAndMonitoring,
- actionQueues,
- this,
- contentCache);
+ actionQueues);
initAMFilterOptions(serviceConf);
int webAppPort = deployWebApplication(webAppApi);
@@ -770,9 +698,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
String scheme = WebAppUtils.HTTP_PREFIX;
appMasterTrackingUrl = scheme + appMasterHostname + ":" + webAppPort;
- appInformation.put(StatusKeys.INFO_AM_WEB_URL, appMasterTrackingUrl + "/");
- appInformation.set(StatusKeys.INFO_AM_WEB_PORT, webAppPort);
-
// *****************************************************
// Register self with ResourceManager
// This will start heartbeating to the RM
@@ -785,6 +710,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
appMasterTrackingUrl);
maximumResourceCapability = amRegistrationData.getMaximumResourceCapability();
+ //TODO should not read local configs !!!
int minMemory = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
// validate scheduler vcores allocation setting
@@ -798,11 +724,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// the max value as part of its lookup
rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient, maximumResourceCapability);
- // set the RM-defined maximum cluster values
- appInformation.put(ResourceKeys.YARN_CORES, Integer.toString(maxCores));
- appInformation.put(ResourceKeys.YARN_MEMORY, Integer.toString(maxMemory));
-
- processAMCredentials(securityConfiguration);
+// processAMCredentials(securityConfiguration);
if (securityEnabled) {
secretManager.setMasterKey(
@@ -817,7 +739,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// principal. Can do so now since AM registration with RM above required
// tokens associated to principal
String principal = securityConfiguration.getPrincipal();
- File localKeytabFile = securityConfiguration.getKeytabFile(instanceDefinition);
+ //TODO read key tab file from slider-am.xml
+ File localKeytabFile =
+ securityConfiguration.getKeytabFile(new AggregateConf());
// Now log in...
login(principal, localKeytabFile);
// obtain new FS reference that should be kerberos based and different
@@ -829,10 +753,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// YARN client.
// Important: this is only valid at startup, and must be executed within
// the right UGI context. Use with care.
- SliderYarnClientImpl yarnClient = null;
+ YarnClient yarnClient = null;
List<NodeReport> nodeReports;
try {
- yarnClient = new SliderYarnClientImpl();
+ yarnClient = YarnClient.createYarnClient();
yarnClient.init(getConfig());
yarnClient.start();
nodeReports = getNodeReports(yarnClient);
@@ -856,45 +780,23 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// extract container list
liveContainers = amRegistrationData.getContainersFromPreviousAttempts();
-
- //now validate the installation
- Configuration providerConf =
- providerService.loadProviderConfigurationInformation(confDir);
-
- providerService.initializeApplicationConfiguration(instanceDefinition,
- fs, null);
-
- providerService.validateApplicationConfiguration(instanceDefinition,
- confDir,
- securityEnabled);
+ DefaultMetricsSystem.initialize("SliderAppMaster");
//determine the location for the role history data
- Path historyDir = new Path(clusterDirPath, HISTORY_DIR_NAME);
+ Path historyDir = new Path(appDir, HISTORY_DIR_NAME);
//build the instance
AppStateBindingInfo binding = new AppStateBindingInfo();
- binding.instanceDefinition = instanceDefinition;
binding.serviceConfig = serviceConf;
- binding.publishedProviderConf = providerConf;
binding.roles = providerRoles;
binding.fs = fs.getFileSystem();
binding.historyPath = historyDir;
binding.liveContainers = liveContainers;
- binding.applicationInfo = appInformation;
- binding.releaseSelector = providerService.createContainerReleaseSelector();
+ binding.releaseSelector = new MostRecentContainerReleaseSelector();
binding.nodeReports = nodeReports;
+ binding.application = application;
appState.buildInstance(binding);
- providerService.rebuildContainerDetails(liveContainers,
- instanceDefinition.getName(), appState.getRolePriorityMap());
-
- // add the AM to the list of nodes in the cluster
-
- appState.buildAppMasterNode(appMasterContainerID,
- appMasterHostname,
- webAppPort,
- appMasterHostname + ":" + webAppPort);
-
// build up environment variables that the AM wants set in every container
// irrespective of provider and role.
envVars = new HashMap<>();
@@ -908,8 +810,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
}
String rolesTmpSubdir = appMasterContainerID.toString() + "/roles";
- String amTmpDir = globalInternalOptions.getMandatoryOption(InternalKeys.INTERNAL_AM_TMP_DIR);
-
+ String amTmpDir = "/tmp";
+ //TODO read tmpDir from slider-am.xml
Path tmpDirPath = new Path(amTmpDir);
Path launcherTmpDirPath = new Path(tmpDirPath, rolesTmpSubdir);
fs.getFileSystem().mkdirs(launcherTmpDirPath);
@@ -917,29 +819,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
//launcher service
launchService = new RoleLaunchService(actionQueues,
providerService,
- fs,
- new Path(getGeneratedConfDir()),
- envVars,
- launcherTmpDirPath);
+ fs, envVars);
deployChildService(launchService);
- appState.noteAMLaunched();
-
-
//Give the provider access to the state, and AM
- providerService.bind(stateForProviders, actionQueues, liveContainers);
- sliderAMProvider.bind(stateForProviders, actionQueues, liveContainers);
+ providerService.setAMState(stateForProviders);
// chaos monkey
- maybeStartMonkey();
-
- // setup token renewal and expiry handling for long lived apps
-// if (!securityConfiguration.isKeytabProvided() &&
-// SliderUtils.isHadoopClusterSecure(getConfig())) {
-// fsDelegationTokenManager = new FsDelegationTokenManager(actionQueues);
-// fsDelegationTokenManager.acquireDelegationToken(getConfig());
-// }
+// maybeStartMonkey();
// if not a secure cluster, extract the username -it will be
// propagated to workers
@@ -955,25 +843,21 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
log.info("Application Master Initialization Completed");
initCompleted.set(true);
- scheduleFailureWindowResets(instanceDefinition.getResources());
- scheduleEscalation(instanceDefinition.getInternal());
+ scheduleFailureWindowResets(application.getConfiguration());
+ scheduleEscalation(application.getConfiguration());
try {
// schedule YARN Registry registration
- queue(new ActionRegisterServiceInstance(clustername, appid));
+ queue(new ActionRegisterServiceInstance(appName, appid, application));
// log the YARN and web UIs
log.info("RM Webapp address {}",
serviceConf.get(YarnConfiguration.RM_WEBAPP_ADDRESS));
log.info("Slider webapp address {} proxied at {}",
appMasterTrackingUrl, appMasterProxiedUrl);
-
- // Start the Slider AM provider
- sliderAMProvider.start();
-
// launch the real provider; this is expected to trigger a callback that
// starts the node review process
- launchProviderService(instanceDefinition, confDir);
+ launchProviderService();
// start handling any scheduled events
@@ -1000,7 +884,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
* @throws InterruptedException
*/
private ApplicationAttemptReport getApplicationAttemptReport(
- final SliderYarnClientImpl yarnClient)
+ final YarnClient yarnClient)
throws YarnException, IOException, InterruptedException {
Preconditions.checkNotNull(yarnClient, "Null Yarn client");
ApplicationAttemptReport report;
@@ -1019,14 +903,14 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
}
/**
- * List the node reports: uses {@link SliderYarnClientImpl} as the login user
+ * List the node reports: uses {@link YarnClient} as the login user
* @param yarnClient client to the RM
* @return the node reports
* @throws IOException
* @throws YarnException
* @throws InterruptedException
*/
- private List<NodeReport> getNodeReports(final SliderYarnClientImpl yarnClient)
+ private List<NodeReport> getNodeReports(final YarnClient yarnClient)
throws IOException, YarnException, InterruptedException {
Preconditions.checkNotNull(yarnClient, "Null Yarn client");
List<NodeReport> nodeReports;
@@ -1051,7 +935,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
* Creates and starts the web application, and adds a
* <code>WebAppService</code> service under the AM, to ensure
* a managed web application shutdown.
- * @param webAppApi web app API instance
+ * @param webAppApi web application API instance
* @return port the web application is deployed on
* @throws IOException general problems starting the webapp (network, etc)
* @throws WebAppException other issues
@@ -1117,12 +1001,14 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/**
* Build up the port scanner. This may include setting a port range.
*/
- private void buildPortScanner(AggregateConf instanceDefinition)
+ private void buildPortScanner()
throws BadConfigException {
portScanner = new PortScanner();
- String portRange = instanceDefinition.
- getAppConfOperations().getGlobalOptions().
- getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0");
+ String portRange = "0";
+ //TODO read from slider-am.xml
+// String portRange = instanceDefinition.
+// getAppConfOperations().getGlobalOptions().
+// getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0");
if (!"0".equals(portRange)) {
portScanner.setPortRange(portRange);
}
@@ -1203,11 +1089,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
* @throws IOException
*/
public void registerServiceInstance(String instanceName,
- ApplicationId appId) throws IOException {
-
-
- // the registry is running, so register services
- URL amWebURI = new URL(appMasterProxiedUrl);
+ ApplicationId appId, Application application) throws IOException {
//Give the provider restricted access to the state, registry
setupInitialRegistryPaths();
@@ -1218,7 +1100,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
instanceName,
appAttemptID);
providerService.bindToYarnRegistry(yarnRegistryOperations);
- sliderAMProvider.bindToYarnRegistry(yarnRegistryOperations);
// Yarn registry
ServiceRecord serviceRecord = new ServiceRecord();
@@ -1231,19 +1112,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
RegistryTypeUtils.ipcEndpoint(
CustomRegistryConstants.AM_IPC_PROTOCOL,
rpcServiceAddress));
-
- // internal services
- sliderAMProvider.applyInitialRegistryDefinitions(amWebURI,
- serviceRecord);
-
- // provider service dynamic definitions.
- providerService.applyInitialRegistryDefinitions(amWebURI, serviceRecord);
-
// set any provided attributes
- setProvidedServiceRecordAttributes(
- getInstanceDefinition().getAppConfOperations().getComponent(
- SliderKeys.COMPONENT_AM), serviceRecord);
+ setUserProvidedServiceRecordAttributes(application.getConfiguration(),
+ serviceRecord);
// register the service's entry
log.info("Service Record \n{}", serviceRecord);
@@ -1276,7 +1148,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/**
* Handler for {@link RegisterComponentInstance action}
- * Register/re-register an ephemeral container that is already in the app state
+ * Register/re-register an ephemeral container that is already in the application state
* @param id the component
* @param description component description
* @param type component type
@@ -1291,32 +1163,36 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// this is where component registrations go
log.info("Registering component {}", id);
String cid = RegistryPathUtils.encodeYarnID(id.toString());
- ServiceRecord container = new ServiceRecord();
- container.set(YarnRegistryAttributes.YARN_ID, cid);
- container.description = description;
- container.set(YarnRegistryAttributes.YARN_PERSISTENCE,
+ ServiceRecord record = new ServiceRecord();
+ record.set(YarnRegistryAttributes.YARN_ID, cid);
+ record.description = description;
+ record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
PersistencePolicies.CONTAINER);
- MapOperations compOps = getInstanceDefinition().getAppConfOperations().
- getComponent(type);
- setProvidedServiceRecordAttributes(compOps, container);
+ setUserProvidedServiceRecordAttributes(
+ instance.providerRole.component.getConfiguration(), record);
try {
- yarnRegistryOperations.putComponent(cid, container);
+ yarnRegistryOperations.putComponent(cid, record);
} catch (IOException e) {
log.warn("Failed to register container {}/{}: {}",
id, description, e, e);
return false;
}
+ org.apache.slider.api.resource.Container container =
+ new org.apache.slider.api.resource.Container();
+ container.setId(id.toString());
+ container.setLaunchTime(new Date());
+ container.setState(org.apache.slider.api.resource.ContainerState.INIT);
+ container.setBareHost(instance.host);
+ instance.providerRole.component.addContainer(container);
return true;
}
- protected void setProvidedServiceRecordAttributes(MapOperations ops,
- ServiceRecord record) {
+ protected void setUserProvidedServiceRecordAttributes(
+ org.apache.slider.api.resource.Configuration conf, ServiceRecord record) {
String prefix = RoleKeys.SERVICE_RECORD_ATTRIBUTE_PREFIX;
- for (Map.Entry<String, String> entry : ops.entrySet()) {
- if (entry.getKey().startsWith(
- prefix)) {
- String key = entry.getKey().substring(
- prefix.length() + 1);
+ for (Map.Entry<String, String> entry : conf.getProperties().entrySet()) {
+ if (entry.getKey().startsWith(prefix)) {
+ String key = entry.getKey().substring(prefix.length() + 1);
record.set(key, entry.getValue().trim());
}
}
@@ -1366,35 +1242,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
}
/**
- * Build the configuration directory passed in or of the target FS
- * @return the file
- */
- public File getLocalConfDir() {
- File confdir =
- new File(SliderKeys.PROPAGATED_CONF_DIR_NAME).getAbsoluteFile();
- return confdir;
- }
-
- /**
- * Get the path to the DFS configuration that is defined in the cluster specification
- * @return the generated configuration dir
- */
- public String getGeneratedConfDir() {
- return getGlobalInternalOptions().get(
- InternalKeys.INTERNAL_GENERATED_CONF_PATH);
- }
-
- /**
- * Get the global internal options for the AM
- * @return a map to access the internals
- */
- public MapOperations getGlobalInternalOptions() {
- return getInstanceDefinition()
- .getInternalOperations().
- getGlobalOptions();
- }
-
- /**
* Get the filesystem of this cluster
* @return the FS of the config
*/
@@ -1480,11 +1327,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
Exception exception = stopAction.getEx();
appStatus = stopAction.getFinalApplicationStatus();
- if (!spawnedProcessExitedBeforeShutdownTriggered) {
- //stopped the forked process but don't worry about its exit code
- int forkedExitCode = stopForkedProcess();
- log.debug("Stopped forked process: exit code={}", forkedExitCode);
- }
// make sure the AM is actually registered. If not, there's no point
// trying to unregister it
@@ -1500,7 +1342,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
launchService.stop();
//now release all containers
- releaseAllContainers();
+ releaseAllContainers(application);
+ DefaultMetricsSystem.shutdown();
// When the application completes, it should send a finish application
// signal to the RM
@@ -1536,7 +1379,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/**
* Start the slider RPC server
*/
- private void startSliderRPCServer(AggregateConf instanceDefinition)
+ private void startSliderRPCServer()
throws IOException, SliderException {
verifyIPCAccess();
@@ -1612,16 +1455,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
//for each assignment: instantiate that role
for (ContainerAssignment assignment : assignments) {
- try {
- launchService.launchRole(assignment, getInstanceDefinition(),
- buildContainerCredentials());
- } catch (IOException e) {
- // Can be caused by failure to renew credentials with the remote
- // service. If so, don't launch the application. Container is retained,
- // though YARN will take it away after a timeout.
- log.error("Failed to build credentials to launch container: {}", e, e);
-
- }
+ //TODO Do we need to pass credentials to containers?
+ launchService.launchRole(assignment, application, null);
}
//for all the operations, exec them
@@ -1645,7 +1480,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// non complete containers should not be here
assert (status.getState() == ContainerState.COMPLETE);
- AppState.NodeCompletionResult result = appState.onCompletedNode(status);
+ AppState.NodeCompletionResult result = appState.onCompletedContainer(status);
if (result.containerFailed) {
RoleInstance ri = result.roleInstance;
log.error("Role instance {} failed ", ri);
@@ -1653,7 +1488,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// known nodes trigger notifications
if(!result.unknownNode) {
- getProviderService().notifyContainerCompleted(containerId);
queue(new UnregisterComponentInstance(containerId, 0,
TimeUnit.MILLISECONDS));
}
@@ -1724,22 +1558,14 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
* Implementation of cluster flexing.
* It should be the only way that anything -even the AM itself on startup-
* asks for nodes.
- * @param resources the resource tree
* @throws SliderException slider problems, including invalid configs
* @throws IOException IO problems
*/
- public void flexCluster(ConfTree resources)
+ public void flexCluster(Messages.FlexComponentRequestProto request)
throws IOException, SliderException {
-
- AggregateConf newConf =
- new AggregateConf(appState.getInstanceDefinitionSnapshot());
- newConf.setResources(resources);
- // verify the new definition is valid
- sliderAMProvider.validateInstanceDefinition(newConf);
- providerService.validateInstanceDefinition(newConf);
-
- appState.updateResourceDefinitions(resources);
-
+ if (request != null) {
+ appState.updateComponents(request);
+ }
// reset the scheduled windows...the values
// may have changed
appState.resetFailureCounts();
@@ -1750,24 +1576,37 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/**
* Schedule the failure window
- * @param resources the resource tree
* @throws BadConfigException if the window is out of range
*/
- private void scheduleFailureWindowResets(ConfTree resources) throws
- BadConfigException {
+ private void scheduleFailureWindowResets(
+ org.apache.slider.api.resource.Configuration conf) {
+
ResetFailureWindow reset = new ResetFailureWindow(rmOperationHandler);
- ConfTreeOperations ops = new ConfTreeOperations(resources);
- MapOperations globals = ops.getGlobalOptions();
- long seconds = globals.getTimeRange(ResourceKeys.CONTAINER_FAILURE_WINDOW,
- ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS,
- ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS,
- ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES, 0);
- if (seconds > 0) {
- log.info(
- "Scheduling the failure window reset interval to every {} seconds",
- seconds);
- RenewingAction<ResetFailureWindow> renew = new RenewingAction<>(
- reset, seconds, seconds, TimeUnit.SECONDS, 0);
+
+ long days =
+ conf.getPropertyLong(ResourceKeys.CONTAINER_FAILURE_WINDOW + ".days",
+ ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS);
+ long hours =
+ conf.getPropertyLong(ResourceKeys.CONTAINER_FAILURE_WINDOW + ".hours",
+ ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS);
+ long minutes =
+ conf.getPropertyLong(ResourceKeys.CONTAINER_FAILURE_WINDOW + ".minutes",
+ ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES);
+ long seconds =
+ conf.getPropertyLong(ResourceKeys.CONTAINER_FAILURE_WINDOW + ".seconds",
+ 0);
+ Preconditions
+ .checkState(days >= 0 && hours >= 0 && minutes >= 0 && seconds >= 0,
+ "Time range for has negative time component %s:%s:%s:%s", days,
+ hours, minutes, seconds);
+ long totalMinutes = days * 24 * 60 + hours * 24 + minutes;
+ long totalSeconds = totalMinutes * 60 + seconds;
+ if (totalSeconds > 0) {
+ log.info("Scheduling the failure window reset interval to every {}"
+ + " seconds", totalSeconds);
+ RenewingAction<ResetFailureWindow> renew =
+ new RenewingAction<>(reset, totalSeconds, totalSeconds,
+ TimeUnit.SECONDS, 0);
actionQueues.renewing("failures", renew);
} else {
log.info("Failure window reset interval is not set");
@@ -1776,16 +1615,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/**
* Schedule the escalation action
- * @param internal
* @throws BadConfigException
*/
- private void scheduleEscalation(ConfTree internal) throws BadConfigException {
+ private void scheduleEscalation(
+ org.apache.slider.api.resource.Configuration conf) {
EscalateOutstandingRequests escalate = new EscalateOutstandingRequests();
- ConfTreeOperations ops = new ConfTreeOperations(internal);
- int seconds = ops.getGlobalOptions().getOptionInt(InternalKeys.ESCALATION_CHECK_INTERVAL,
+ long seconds = conf.getPropertyLong(InternalKeys.ESCALATION_CHECK_INTERVAL,
InternalKeys.DEFAULT_ESCALATION_CHECK_INTERVAL);
- RenewingAction<EscalateOutstandingRequests> renew = new RenewingAction<>(
- escalate, seconds, seconds, TimeUnit.SECONDS, 0);
+ RenewingAction<EscalateOutstandingRequests> renew =
+ new RenewingAction<>(escalate, seconds, seconds, TimeUnit.SECONDS, 0);
actionQueues.renewing("escalation", renew);
}
@@ -1794,7 +1632,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
* @param reason reason for operation
*/
private synchronized void reviewRequestAndReleaseNodes(String reason) {
- log.debug("reviewRequestAndReleaseNodes({})", reason);
+ log.info("reviewRequestAndReleaseNodes({})", reason);
queue(new ReviewAndFlexApplicationSize(reason, 0, TimeUnit.SECONDS));
}
@@ -1810,6 +1648,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
if ( actionQueues.hasQueuedActionWithAttribute(
AsyncAction.ATTR_REVIEWS_APP_SIZE | AsyncAction.ATTR_HALTS_APP)) {
+ //TODO Loop all actions to check duplicate ??
// this operation isn't needed at all -existing duplicate or shutdown due
return;
}
@@ -1829,14 +1668,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
public synchronized void executeNodeReview(String reason)
throws SliderInternalStateException {
- log.debug("in executeNodeReview({})", reason);
+ log.info("in executeNodeReview({})", reason);
if (amCompletionFlag.get()) {
log.info("Ignoring node review operation: shutdown in progress");
}
try {
List<AbstractRMOperation> allOperations = appState.reviewRequestAndReleaseNodes();
- // tell the provider
- providerRMOperationHandler.execute(allOperations);
//now apply the operations
execute(allOperations);
} catch (TriggerClusterTeardownException e) {
@@ -1853,7 +1690,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
*/
public void escalateOutstandingRequests() {
List<AbstractRMOperation> operations = appState.escalateOutstandingRequests();
- providerRMOperationHandler.execute(operations);
execute(operations);
}
@@ -1861,11 +1697,11 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/**
* Shutdown operation: release all containers
*/
- private void releaseAllContainers() {
+ private void releaseAllContainers(Application application) {
// Add the sleep here (before releasing containers) so that applications get
// time to perform graceful shutdown
try {
- long timeout = getContainerReleaseTimeout();
+ long timeout = getContainerReleaseTimeout(application);
if (timeout > 0) {
Thread.sleep(timeout);
}
@@ -1873,22 +1709,16 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
log.info("Sleep for container release interrupted");
} finally {
List<AbstractRMOperation> operations = appState.releaseAllContainers();
- providerRMOperationHandler.execute(operations);
// now apply the operations
execute(operations);
}
}
- private long getContainerReleaseTimeout() {
+ private long getContainerReleaseTimeout(Application application) {
// Get container release timeout in millis or 0 if the property is not set.
- // If non-zero then add the agent heartbeat delay time, since it can take up
- // to that much time for agents to receive the stop command.
- int timeout = getInstanceDefinition().getAppConfOperations()
- .getGlobalOptions()
- .getOptionInt(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0);
- if (timeout > 0) {
- timeout += SliderKeys.APP_CONTAINER_HEARTBEAT_INTERVAL_SEC;
- }
+ long timeout = application.getConfiguration()
+ .getPropertyLong(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0);
+
// convert to millis
long timeoutInMillis = timeout * 1000l;
log.info("Container release timeout in millis = {}", timeoutInMillis);
@@ -2000,27 +1830,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/**
* Launch the provider service
- *
- * @param instanceDefinition definition of the service
- * @param confDir directory of config data
* @throws IOException
* @throws SliderException
*/
- protected synchronized void launchProviderService(AggregateConf instanceDefinition,
- File confDir)
- throws IOException, SliderException {
- Map<String, String> env = new HashMap<>();
- boolean execStarted = providerService.exec(instanceDefinition, confDir, env,
- this);
- if (execStarted) {
- providerService.registerServiceListener(this);
- providerService.start();
- } else {
- // didn't start, so don't register
- providerService.start();
- // and send the started event ourselves
- eventCallbackEvent(null);
- }
+ protected synchronized void launchProviderService()
+ throws IOException, SliderException {
+ // didn't start, so don't register
+ providerService.start();
+ // and send the started event ourselves
+ eventCallbackEvent(null);
}
/* =================================================================== */
@@ -2029,11 +1847,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
@Override // ProviderCompleted
public void eventCallbackEvent(Object parameter) {
- // signalled that the child process is up.
- appState.noteAMLive();
// now ask for the cluster nodes
try {
- flexCluster(getInstanceDefinition().getResources());
+ flexCluster(null);
} catch (Exception e) {
// cluster flex failure: log
log.error("Failed to flex cluster nodes: {}", e, e);
@@ -2064,62 +1880,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
}
}
- /* =================================================================== */
- /* ServiceStateChangeListener */
- /* =================================================================== */
-
- /**
- * Received on listening service termination.
- * @param service the service that has changed.
- */
- @Override //ServiceStateChangeListener
- public void stateChanged(Service service) {
- if (service == providerService && service.isInState(STATE.STOPPED)) {
- //its the current master process in play
- int exitCode = providerService.getExitCode();
- int mappedProcessExitCode = exitCode;
-
- boolean shouldTriggerFailure = !amCompletionFlag.get()
- && (mappedProcessExitCode != 0);
-
- if (shouldTriggerFailure) {
- String reason =
- "Spawned process failed with raw " + exitCode + " mapped to " +
- mappedProcessExitCode;
- ActionStopSlider stop = new ActionStopSlider("stop",
- mappedProcessExitCode,
- FinalApplicationStatus.FAILED,
- reason);
- //this wasn't expected: the process finished early
- spawnedProcessExitedBeforeShutdownTriggered = true;
- log.info(
- "Process has exited with exit code {} mapped to {} -triggering termination",
- exitCode,
- mappedProcessExitCode);
-
- //tell the AM the cluster is complete
- signalAMComplete(stop);
- } else {
- //we don't care
- log.info(
- "Process has exited with exit code {} mapped to {} -ignoring",
- exitCode,
- mappedProcessExitCode);
- }
- } else {
- super.stateChanged(service);
- }
- }
-
- /**
- * stop forked process if it the running process var is not null
- * @return the process exit code
- */
- protected synchronized Integer stopForkedProcess() {
- providerService.stop();
- return providerService.getExitCode();
- }
-
/**
* Async start container request
* @param container container
@@ -2221,16 +1981,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
LOG_YARN.warn("Failed to stop Container {}", containerId);
}
- public AggregateConf getInstanceDefinition() {
- return appState.getInstanceDefinition();
- }
-
- /**
- * This is the status, the live model
- */
- public ClusterDescription getClusterDescription() {
- return appState.getClusterStatus();
- }
public ProviderService getProviderService() {
return providerService;
@@ -2278,12 +2028,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
}
/**
- * Start the chaos monkey
+ * TODO Start the chaos monkey
* @return true if it started
*/
private boolean maybeStartMonkey() {
- MapOperations internals = getGlobalInternalOptions();
-
+// MapOperations internals = getGlobalInternalOptions();
+ MapOperations internals = new MapOperations();
Boolean enabled =
internals.getOptionBool(InternalKeys.CHAOS_MONKEY_ENABLED,
InternalKeys.DEFAULT_CHAOS_MONKEY_ENABLED);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java
index 6b61681..a660958 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java
@@ -18,6 +18,7 @@
package org.apache.slider.server.appmaster.actions;
+import org.apache.slider.api.proto.Messages;
import org.apache.slider.core.conf.ConfTree;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.state.AppState;
@@ -26,19 +27,16 @@ import java.util.concurrent.TimeUnit;
public class ActionFlexCluster extends AsyncAction {
- public final ConfTree resources;
-
- public ActionFlexCluster(String name,
- long delay,
- TimeUnit timeUnit, ConfTree resources) {
+ final Messages.FlexComponentRequestProto requestProto;
+ public ActionFlexCluster(String name, long delay, TimeUnit timeUnit,
+ Messages.FlexComponentRequestProto requestProto) {
super(name, delay, timeUnit, ATTR_CHANGES_APP_SIZE);
- this.resources = resources;
+ this.requestProto = requestProto;
}
-
@Override
public void execute(SliderAppMaster appMaster,
QueueAccess queueService,
AppState appState) throws Exception {
- appMaster.flexCluster(resources);
+ appMaster.flexCluster(requestProto);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java
index ca330af..0d7f7d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java
@@ -19,6 +19,7 @@
package org.apache.slider.server.appmaster.actions;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.slider.api.resource.Application;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.state.AppState;
@@ -31,21 +32,13 @@ public class ActionRegisterServiceInstance extends AsyncAction {
private final String instanceName;
private final ApplicationId appId;
-
+ private final Application application;
public ActionRegisterServiceInstance(String instanceName,
- ApplicationId appId) {
+ ApplicationId appId, Application application) {
super("ActionRegisterServiceInstance");
this.instanceName = instanceName;
this.appId = appId;
- }
-
- public ActionRegisterServiceInstance(String instanceName,
- ApplicationId appId,
- long delay,
- TimeUnit timeUnit) {
- super("ActionRegisterServiceInstance", delay, timeUnit);
- this.instanceName = instanceName;
- this.appId = appId;
+ this.application = application;
}
@Override
@@ -54,6 +47,6 @@ public class ActionRegisterServiceInstance extends AsyncAction {
AppState appState) throws Exception {
// YARN Registry do the registration
- appMaster.registerServiceInstance(instanceName, appId);
+ appMaster.registerServiceInstance(instanceName, appId, application);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/metrics/SliderMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/metrics/SliderMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/metrics/SliderMetrics.java
new file mode 100644
index 0000000..510ff73
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/metrics/SliderMetrics.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.metrics;
+
+import com.codahale.metrics.Counter;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+@Metrics(context = "yarn-native-service")
+public class SliderMetrics implements MetricsSource {
+
+ @Metric("containers pending")
+ public MutableGaugeInt containersPending;
+ @Metric("anti-affinity containers pending")
+ public MutableGaugeInt pendingAAContainers;
+ @Metric("containers pending")
+ public MutableGaugeInt containersRunning;
+ @Metric("containers requested")
+ public MutableGaugeInt containersDesired;
+ @Metric("containers completed")
+ public MutableGaugeInt containersCompleted;
+ @Metric("containers failed")
+ public MutableGaugeInt containersFailed;
+ @Metric("containers failed since last threshold")
+ public MutableGaugeInt failedSinceLastThreshold;
+ @Metric("containers preempted")
+ public MutableGaugeInt containersPreempted;
+ @Metric("containers surplus")
+ public MutableGaugeInt surplusContainers;
+
+ protected final MetricsRegistry registry;
+
+ public SliderMetrics(MetricsInfo metricsInfo) {
+ registry = new MetricsRegistry(metricsInfo);
+ }
+
+ @Override
+ public void getMetrics(MetricsCollector collector, boolean all) {
+ registry.snapshot(collector.addRecord(registry.info()), all);
+ }
+
+ public static SliderMetrics register(String name, String description) {
+ SliderMetrics metrics = new SliderMetrics(info(name, description));
+ DefaultMetricsSystem.instance().register(name, description, metrics);
+ return metrics;
+ }
+
+ public void tag(String name, String description, String value) {
+ registry.tag(name, description, value);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
deleted file mode 100644
index 972cc30..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.appmaster.operations;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.slider.providers.ProviderService;
-
-import java.util.List;
-
-public class ProviderNotifyingOperationHandler extends RMOperationHandler {
-
- private final ProviderService providerService;
-
- public ProviderNotifyingOperationHandler(ProviderService providerService) {
- this.providerService = providerService;
- }
-
- @Override
- public void releaseAssignedContainer(ContainerId containerId) {
- providerService.releaseAssignedContainer(containerId);
- }
-
- @Override
- public void addContainerRequest(AMRMClient.ContainerRequest req) {
- providerService.addContainerRequest(req);
- }
-
- @Override
- public int cancelContainerRequests(Priority priority1,
- Priority priority2,
- int count) {
- return providerService.cancelContainerRequests(priority1, priority2, count);
- }
-
- @Override
- public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
- providerService.cancelSingleRequest(request);
- }
-
- @Override
- public void updateBlacklist(List<String> blacklistAdditions,
- List<String> blacklistRemovals) {
- providerService.updateBlacklist(blacklistAdditions, blacklistRemovals);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
index fbd408e..4d483c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
@@ -70,11 +70,12 @@ public class SliderClusterProtocolPBImpl implements SliderClusterProtocolPB {
}
@Override
- public Messages.FlexClusterResponseProto flexCluster(RpcController controller,
- Messages.FlexClusterRequestProto request) throws ServiceException {
+ public Messages.FlexComponentResponseProto flexComponent(
+ RpcController controller, Messages.FlexComponentRequestProto request)
+ throws ServiceException {
try {
- return real.flexCluster(request);
- } catch (Exception e) {
+ return real.flexComponent(request);
+ } catch (IOException e) {
throw wrap(e);
}
}
@@ -90,19 +91,6 @@ public class SliderClusterProtocolPBImpl implements SliderClusterProtocolPB {
}
}
-
- @Override
- public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition(
- RpcController controller,
- Messages.GetInstanceDefinitionRequestProto request)
- throws ServiceException {
- try {
- return real.getInstanceDefinition(request);
- } catch (Exception e) {
- throw wrap(e);
- }
- }
-
@Override
public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(
RpcController controller,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
index 448c6f3..c60d609 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
@@ -110,10 +110,10 @@ public class SliderClusterProtocolProxy implements SliderClusterProtocol {
}
@Override
- public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request)
- throws IOException {
+ public Messages.FlexComponentResponseProto flexComponent(
+ Messages.FlexComponentRequestProto request) throws IOException {
try {
- return endpoint.flexCluster(NULL_CONTROLLER, request);
+ return endpoint.flexComponent(NULL_CONTROLLER, request);
} catch (ServiceException e) {
throw convert(e);
}
@@ -131,19 +131,6 @@ public class SliderClusterProtocolProxy implements SliderClusterProtocol {
}
}
-
- @Override
- public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition(
- Messages.GetInstanceDefinitionRequestProto request) throws
- IOException,
- YarnException {
- try {
- return endpoint.getInstanceDefinition(NULL_CONTROLLER, request);
- } catch (ServiceException e) {
- throw convert(e);
- }
- }
-
@Override
public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(Messages.ListNodeUUIDsByRoleRequestProto request) throws
IOException,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
index 70c2f05..344495b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
@@ -24,9 +24,9 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.SliderClusterProtocol;
import org.apache.slider.api.proto.Messages;
+import org.apache.slider.api.resource.Application;
import org.apache.slider.api.types.ApplicationLivenessInformation;
import org.apache.slider.api.types.ComponentInformation;
import org.apache.slider.api.types.ContainerInformation;
@@ -38,6 +38,7 @@ import org.apache.slider.core.exceptions.ServiceNotReadyException;
import org.apache.slider.core.main.LauncherExitCodes;
import org.apache.slider.core.persist.AggregateConfSerDeser;
import org.apache.slider.core.persist.ConfTreeSerDeser;
+import org.apache.slider.core.persist.JsonSerDeser;
import org.apache.slider.server.appmaster.AppMasterActionOperations;
import org.apache.slider.server.appmaster.actions.ActionFlexCluster;
import org.apache.slider.server.appmaster.actions.ActionHalt;
@@ -78,6 +79,9 @@ public class SliderIPCService extends AbstractService
private final MetricsAndMonitoring metricsAndMonitoring;
private final AppMasterActionOperations amOperations;
private final ContentCache cache;
+ private static final JsonSerDeser<Application> jsonSerDeser =
+ new JsonSerDeser<Application>(Application.class);
+
/**
* This is the prefix used for metrics
@@ -195,17 +199,12 @@ public class SliderIPCService extends AbstractService
return Messages.UpgradeContainersResponseProto.getDefaultInstance();
}
- @Override //SliderClusterProtocol
- public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request)
- throws IOException {
+ @Override
+ public Messages.FlexComponentResponseProto flexComponent(
+ Messages.FlexComponentRequestProto request) throws IOException {
onRpcCall("flex");
- String payload = request.getClusterSpec();
- ConfTreeSerDeser confTreeSerDeser = new ConfTreeSerDeser();
- ConfTree updatedResources = confTreeSerDeser.fromJson(payload);
- schedule(new ActionFlexCluster("flex", 1, TimeUnit.MILLISECONDS,
- updatedResources));
- return Messages.FlexClusterResponseProto.newBuilder().setResponse(
- true).build();
+ schedule(new ActionFlexCluster("flex", 1, TimeUnit.MILLISECONDS, request));
+ return Messages.FlexComponentResponseProto.newBuilder().build();
}
@Override //SliderClusterProtocol
@@ -216,38 +215,10 @@ public class SliderIPCService extends AbstractService
String result;
//quick update
//query and json-ify
- ClusterDescription cd = state.refreshClusterStatus();
- result = cd.toJsonString();
- String stat = result;
+ Application application = state.refreshClusterStatus();
+ String stat = jsonSerDeser.toJson(application);
return Messages.GetJSONClusterStatusResponseProto.newBuilder()
- .setClusterSpec(stat)
- .build();
- }
-
- @Override
- public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition(
- Messages.GetInstanceDefinitionRequestProto request)
- throws IOException, YarnException {
-
- onRpcCall("getinstancedefinition");
- String internal;
- String resources;
- String app;
- AggregateConf instanceDefinition =
- state.getInstanceDefinitionSnapshot();
- internal = instanceDefinition.getInternal().toJson();
- resources = instanceDefinition.getResources().toJson();
- app = instanceDefinition.getAppConf().toJson();
- assert internal != null;
- assert resources != null;
- assert app != null;
- log.debug("Generating getInstanceDefinition Response");
- Messages.GetInstanceDefinitionResponseProto.Builder builder =
- Messages.GetInstanceDefinitionResponseProto.newBuilder();
- builder.setInternal(internal);
- builder.setResources(resources);
- builder.setApplication(app);
- return builder.build();
+ .setClusterSpec(stat).build();
}
@Override //SliderClusterProtocol
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c0536f18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java
index 9a89c39..b31babc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java
@@ -138,6 +138,7 @@ public class SecurityConfiguration {
public File getKeytabFile(AggregateConf instanceDefinition)
throws SliderException, IOException {
+ //TODO implement this for dash semantic
String keytabFullPath = instanceDefinition.getAppConfOperations()
.getComponent(SliderKeys.COMPONENT_AM)
.get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org