You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by go...@apache.org on 2016/08/25 20:19:52 UTC
[23/46] incubator-slider git commit: SLIDER-1165 Create
yarn-native-services branch on Slider corresponding to the
yarn-native-services branch on Hadoop
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
deleted file mode 100644
index 040f12b..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ /dev/null
@@ -1,2451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.appmaster;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.health.HealthCheckRegistry;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.http.HttpConfig;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
-import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.service.ServiceOperations;
-import org.apache.hadoop.service.ServiceStateChangeListener;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
-import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
-import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG;
-
-import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.registry.client.api.RegistryOperations;
-import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
-import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
-import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
-import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
-import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.webapp.WebAppException;
-import org.apache.hadoop.yarn.webapp.WebApps;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-import org.apache.slider.api.ClusterDescription;
-import org.apache.slider.api.InternalKeys;
-import org.apache.slider.api.ResourceKeys;
-import org.apache.slider.api.RoleKeys;
-import org.apache.slider.api.StatusKeys;
-import org.apache.slider.api.proto.SliderClusterAPI;
-import org.apache.slider.client.SliderYarnClientImpl;
-import org.apache.slider.common.SliderExitCodes;
-import org.apache.slider.common.SliderKeys;
-import org.apache.slider.common.params.AbstractActionArgs;
-import org.apache.slider.common.params.SliderAMArgs;
-import org.apache.slider.common.params.SliderAMCreateAction;
-import org.apache.slider.common.params.SliderActions;
-import org.apache.slider.common.tools.ConfigHelper;
-import org.apache.slider.common.tools.PortScanner;
-import org.apache.slider.common.tools.SliderFileSystem;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.common.tools.SliderVersionInfo;
-import org.apache.slider.core.build.InstanceIO;
-import org.apache.slider.core.conf.AggregateConf;
-import org.apache.slider.core.conf.ConfTree;
-import org.apache.slider.core.conf.ConfTreeOperations;
-import org.apache.slider.core.conf.MapOperations;
-import org.apache.slider.core.exceptions.BadConfigException;
-import org.apache.slider.core.exceptions.SliderException;
-import org.apache.slider.core.exceptions.SliderInternalStateException;
-import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
-import org.apache.slider.core.launch.CredentialUtils;
-import org.apache.slider.core.main.ExitCodeProvider;
-import org.apache.slider.core.main.LauncherExitCodes;
-import org.apache.slider.core.main.RunService;
-import org.apache.slider.core.main.ServiceLauncher;
-import org.apache.slider.core.registry.info.CustomRegistryConstants;
-import org.apache.slider.providers.ProviderCompleted;
-import org.apache.slider.providers.ProviderRole;
-import org.apache.slider.providers.ProviderService;
-import org.apache.slider.providers.SliderProviderFactory;
-import org.apache.slider.providers.agent.AgentKeys;
-import org.apache.slider.providers.agent.AgentProviderService;
-import org.apache.slider.providers.slideram.SliderAMClientProvider;
-import org.apache.slider.providers.slideram.SliderAMProviderService;
-import org.apache.slider.server.appmaster.actions.ActionRegisterServiceInstance;
-import org.apache.slider.server.appmaster.actions.EscalateOutstandingRequests;
-import org.apache.slider.server.appmaster.actions.RegisterComponentInstance;
-import org.apache.slider.server.appmaster.actions.QueueExecutor;
-import org.apache.slider.server.appmaster.actions.QueueService;
-import org.apache.slider.server.appmaster.actions.ActionStopSlider;
-import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers;
-import org.apache.slider.server.appmaster.actions.AsyncAction;
-import org.apache.slider.server.appmaster.actions.RenewingAction;
-import org.apache.slider.server.appmaster.actions.ResetFailureWindow;
-import org.apache.slider.server.appmaster.actions.ReviewAndFlexApplicationSize;
-import org.apache.slider.server.appmaster.actions.UnregisterComponentInstance;
-import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
-import org.apache.slider.server.appmaster.management.YarnServiceHealthCheck;
-import org.apache.slider.server.appmaster.monkey.ChaosKillAM;
-import org.apache.slider.server.appmaster.monkey.ChaosKillContainer;
-import org.apache.slider.server.appmaster.monkey.ChaosMonkeyService;
-import org.apache.slider.server.appmaster.operations.AsyncRMOperationHandler;
-import org.apache.slider.server.appmaster.operations.ProviderNotifyingOperationHandler;
-import org.apache.slider.server.appmaster.rpc.RpcBinder;
-import org.apache.slider.server.appmaster.rpc.SliderAMPolicyProvider;
-import org.apache.slider.server.appmaster.rpc.SliderClusterProtocolPBImpl;
-import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
-import org.apache.slider.server.appmaster.rpc.SliderIPCService;
-import org.apache.slider.server.appmaster.security.SecurityConfiguration;
-import org.apache.slider.server.appmaster.state.AppState;
-import org.apache.slider.server.appmaster.state.AppStateBindingInfo;
-import org.apache.slider.server.appmaster.state.ContainerAssignment;
-import org.apache.slider.server.appmaster.state.ProviderAppState;
-import org.apache.slider.server.appmaster.operations.RMOperationHandler;
-import org.apache.slider.server.appmaster.state.RoleInstance;
-import org.apache.slider.server.appmaster.web.AgentService;
-import org.apache.slider.server.appmaster.web.rest.InsecureAmFilterInitializer;
-import org.apache.slider.server.appmaster.web.rest.agent.AgentWebApp;
-import org.apache.slider.server.appmaster.web.SliderAMWebApp;
-import org.apache.slider.server.appmaster.web.WebAppApi;
-import org.apache.slider.server.appmaster.web.WebAppApiImpl;
-import org.apache.slider.server.appmaster.web.rest.RestPaths;
-import org.apache.slider.server.appmaster.web.rest.application.ApplicationResouceContentCacheFactory;
-import org.apache.slider.server.appmaster.web.rest.application.resources.ContentCache;
-import org.apache.slider.server.services.security.CertificateManager;
-import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
-import org.apache.slider.server.services.utility.WebAppService;
-import org.apache.slider.server.services.workflow.ServiceThreadFactory;
-import org.apache.slider.server.services.workflow.WorkflowExecutorService;
-import org.apache.slider.server.services.workflow.WorkflowRpcService;
-import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * This is the AM, which directly implements the callbacks from the AM and NM
- */
-public class SliderAppMaster extends AbstractSliderLaunchedService
- implements AMRMClientAsync.CallbackHandler,
- NMClientAsync.CallbackHandler,
- RunService,
- SliderExitCodes,
- SliderKeys,
- ServiceStateChangeListener,
- RoleKeys,
- ProviderCompleted,
- AppMasterActionOperations {
-
- protected static final Logger log =
- LoggerFactory.getLogger(SliderAppMaster.class);
-
- /**
- * log for YARN events
- */
- protected static final Logger LOG_YARN = log;
-
- public static final String SERVICE_CLASSNAME_SHORT = "SliderAppMaster";
- public static final String SERVICE_CLASSNAME =
- "org.apache.slider.server.appmaster." + SERVICE_CLASSNAME_SHORT;
-
- public static final int HEARTBEAT_INTERVAL = 1000;
- public static final int NUM_RPC_HANDLERS = 5;
-
- /**
- * Metrics and monitoring services.
- * Deployed in {@link #serviceInit(Configuration)}
- */
- private final MetricsAndMonitoring metricsAndMonitoring = new MetricsAndMonitoring();
-
- /**
- * metrics registry
- */
- public MetricRegistry metrics;
-
- /** Error string on chaos monkey launch failure action: {@value} */
- public static final String E_TRIGGERED_LAUNCH_FAILURE =
- "Chaos monkey triggered launch failure";
-
- /** YARN RPC to communicate with the Resource Manager or Node Manager */
- private YarnRPC yarnRPC;
-
- /** Handle to communicate with the Resource Manager*/
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private AMRMClientAsync asyncRMClient;
-
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private RMOperationHandler rmOperationHandler;
-
- private RMOperationHandler providerRMOperationHandler;
-
- /** Handle to communicate with the Node Manager*/
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- public NMClientAsync nmClientAsync;
-
- /**
- * Credentials for propagating down to launched containers
- */
- private Credentials containerCredentials;
-
- /**
- * Slider IPC: Real service handler
- */
- private SliderIPCService sliderIPCService;
- /**
- * Slider IPC: binding
- */
- private WorkflowRpcService rpcService;
-
- /**
- * Secret manager
- */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private ClientToAMTokenSecretManager secretManager;
-
- /** Hostname of the container*/
- private String appMasterHostname = "";
- /* Port on which the app master listens for status updates from clients*/
- private int appMasterRpcPort = 0;
- /** Tracking url to which app master publishes info for clients to monitor*/
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private String appMasterTrackingUrl = "";
-
- /** Proxied app master URL (as retrieved from AM report at launch time) */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private String appMasterProxiedUrl = "";
-
- /** Application Attempt Id ( combination of attemptId and fail count )*/
- private ApplicationAttemptId appAttemptID;
-
- /**
- * App ACLs
- */
- protected Map<ApplicationAccessType, String> applicationACLs;
-
- /**
- * Ongoing state of the cluster: containers, nodes they
- * live on, etc.
- */
- private final AppState appState =
- new AppState(new ProtobufClusterServices(), metricsAndMonitoring);
-
- /**
- * App state for external objects. This is almost entirely
- * a read-only view of the application state. To change the state,
- * Providers (or anything else) are expected to queue async changes.
- */
- private final ProviderAppState stateForProviders =
- new ProviderAppState("undefined", appState);
-
- /**
- * model the state using locks and conditions
- */
- private final ReentrantLock AMExecutionStateLock = new ReentrantLock();
- private final Condition isAMCompleted = AMExecutionStateLock.newCondition();
-
- /**
- * Flag set if the AM is to be shutdown
- */
- private final AtomicBoolean amCompletionFlag = new AtomicBoolean(false);
-
- /**
- * Flag set during the init process
- */
- private final AtomicBoolean initCompleted = new AtomicBoolean(false);
-
- /**
- * Flag to set if the process exit code was set before shutdown started
- */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private boolean spawnedProcessExitedBeforeShutdownTriggered;
-
-
- /** Arguments passed in : raw*/
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private SliderAMArgs serviceArgs;
-
- /**
- * ID of the AM container
- */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private ContainerId appMasterContainerID;
-
- /**
- * Monkey Service -may be null
- */
- private ChaosMonkeyService monkey;
-
- /**
- * ProviderService of this cluster
- */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private ProviderService providerService;
-
- /**
- * The YARN registry service
- */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private RegistryOperations registryOperations;
-
- /**
- * The stop request received...the exit details are extracted
- * from this
- */
- private volatile ActionStopSlider stopAction;
-
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private RoleLaunchService launchService;
-
- //username -null if it is not known/not to be set
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private String hadoop_user_name;
- private String service_user_name;
-
- private SliderAMWebApp webApp;
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private InetSocketAddress rpcServiceAddress;
- private SliderAMProviderService sliderAMProvider;
- private CertificateManager certificateManager;
-
- /**
- * Executor.
- * Assigned in {@link #serviceInit(Configuration)}
- */
- private WorkflowExecutorService<ExecutorService> executorService;
-
- /**
- * Action queues. Created at instance creation, but
- * added as a child and inited in {@link #serviceInit(Configuration)}
- */
- private final QueueService actionQueues = new QueueService();
- private String agentOpsUrl;
- private String agentStatusUrl;
- private YarnRegistryViewForProviders yarnRegistryOperations;
- //private FsDelegationTokenManager fsDelegationTokenManager;
- private RegisterApplicationMasterResponse amRegistrationData;
- private PortScanner portScanner;
- private SecurityConfiguration securityConfiguration;
-
- /**
- * Is security enabled?
- * Set early on in the {@link #createAndRunCluster(String)} operation.
- */
- private boolean securityEnabled;
- private ContentCache contentCache;
-
- /**
- * resource limits
- */
- private Resource maximumResourceCapability;
-
- /**
- * Service Constructor
- */
- public SliderAppMaster() {
- super(SERVICE_CLASSNAME_SHORT);
- new HdfsConfiguration();
- new YarnConfiguration();
- }
-
-/* =================================================================== */
-/* service lifecycle methods */
-/* =================================================================== */
-
- @Override //AbstractService
- public synchronized void serviceInit(Configuration conf) throws Exception {
- // slider client if found
-
- Configuration customConf = SliderUtils.loadSliderClientXML();
- // Load in the server configuration - if it is actually on the Classpath
- URL serverXmlUrl = ConfigHelper.getResourceUrl(SLIDER_SERVER_XML);
- if (serverXmlUrl != null) {
- log.info("Loading {} at {}", SLIDER_SERVER_XML, serverXmlUrl);
- Configuration serverConf = ConfigHelper.loadFromResource(SLIDER_SERVER_XML);
- ConfigHelper.mergeConfigurations(customConf, serverConf,
- SLIDER_SERVER_XML, true);
- }
- serviceArgs.applyDefinitions(customConf);
- serviceArgs.applyFileSystemBinding(customConf);
- // conf now contains all customizations
-
- AbstractActionArgs action = serviceArgs.getCoreAction();
- SliderAMCreateAction createAction = (SliderAMCreateAction) action;
-
- // sort out the location of the AM
- String rmAddress = createAction.getRmAddress();
- if (rmAddress != null) {
- log.debug("Setting RM address from the command line: {}", rmAddress);
- SliderUtils.setRmSchedulerAddress(customConf, rmAddress);
- }
-
- log.info("AM configuration:\n{}",
- ConfigHelper.dumpConfigToString(customConf));
- for (Map.Entry<String, String> envs : System.getenv().entrySet()) {
- log.info("System env {}={}", envs.getKey(), envs.getValue());
- }
-
- ConfigHelper.mergeConfigurations(conf, customConf, SLIDER_CLIENT_XML, true);
- //init security with our conf
- if (SliderUtils.isHadoopClusterSecure(conf)) {
- log.info("Secure mode with kerberos realm {}",
- SliderUtils.getKerberosRealm());
- UserGroupInformation.setConfiguration(conf);
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- log.debug("Authenticating as {}", ugi);
- SliderUtils.verifyPrincipalSet(conf, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
- } else {
- log.info("Cluster is insecure");
- }
- log.info("Login user is {}", UserGroupInformation.getLoginUser());
-
- //look at settings of Hadoop Auth, to pick up a problem seen once
- checkAndWarnForAuthTokenProblems();
-
- // validate server env
- boolean dependencyChecks =
- !conf.getBoolean(KEY_SLIDER_AM_DEPENDENCY_CHECKS_DISABLED,
- false);
- SliderUtils.validateSliderServerEnvironment(log, dependencyChecks);
-
- // create and register monitoring services
- addService(metricsAndMonitoring);
- metrics = metricsAndMonitoring.getMetrics();
-/* TODO: turn these one once the metrics testing is more under control
- metrics.registerAll(new ThreadStatesGaugeSet());
- metrics.registerAll(new MemoryUsageGaugeSet());
- metrics.registerAll(new GarbageCollectorMetricSet());
-
-*/
- contentCache = ApplicationResouceContentCacheFactory.createContentCache(stateForProviders);
-
- executorService = new WorkflowExecutorService<>("AmExecutor",
- Executors.newFixedThreadPool(2,
- new ServiceThreadFactory("AmExecutor", true)));
- addService(executorService);
-
- addService(actionQueues);
-
- //init all child services
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- super.serviceStart();
- HealthCheckRegistry health = metricsAndMonitoring.getHealth();
- health.register("AM Health", new YarnServiceHealthCheck(this));
- }
-
- /**
- * Start the queue processing
- */
- private void startQueueProcessing() {
- log.info("Queue Processing started");
- executorService.execute(actionQueues);
- executorService.execute(new QueueExecutor(this, actionQueues));
- }
-
-/* =================================================================== */
-/* RunService methods called from ServiceLauncher */
-/* =================================================================== */
-
- /**
- * pick up the args from the service launcher
- * @param config configuration
- * @param args argument list
- */
- @Override // RunService
- public Configuration bindArgs(Configuration config, String... args) throws Exception {
- // let the superclass process it
- Configuration superConf = super.bindArgs(config, args);
- // add the slider XML config
- ConfigHelper.injectSliderXMLResource();
-
- //yarn-ify
- YarnConfiguration yarnConfiguration = new YarnConfiguration(
- superConf);
- serviceArgs = new SliderAMArgs(args);
- serviceArgs.parse();
-
- return SliderUtils.patchConfiguration(yarnConfiguration);
- }
-
-
- /**
- * this is called by service launcher; when it returns the application finishes
- * @return the exit code to return by the app
- * @throws Throwable
- */
- @Override
- public int runService() throws Throwable {
- SliderVersionInfo.loadAndPrintVersionInfo(log);
-
- //dump the system properties if in debug mode
- if (log.isDebugEnabled()) {
- log.debug("System properties:\n" + SliderUtils.propertiesToString(System.getProperties()));
- }
-
- //choose the action
- String action = serviceArgs.getAction();
- List<String> actionArgs = serviceArgs.getActionArgs();
- int exitCode;
- switch (action) {
- case SliderActions.ACTION_HELP:
- log.info("{}: {}", getName(), serviceArgs.usage());
- exitCode = SliderExitCodes.EXIT_USAGE;
- break;
- case SliderActions.ACTION_CREATE:
- exitCode = createAndRunCluster(actionArgs.get(0));
- break;
- default:
- throw new SliderException("Unimplemented: " + action);
- }
- log.info("Exiting AM; final exit code = {}", exitCode);
- return exitCode;
- }
-
- /**
- * Initialize a newly created service then add it.
- * Because the service is not started, this MUST be done before
- * the AM itself starts, or it is explicitly added after
- * @param service the service to init
- */
- public Service initAndAddService(Service service) {
- service.init(getConfig());
- addService(service);
- return service;
- }
-
- /* =================================================================== */
-
- /**
- * Create and run the cluster.
- * @param clustername cluster name
- * @return exit code
- * @throws Throwable on a failure
- */
- private int createAndRunCluster(String clustername) throws Throwable {
-
- //load the cluster description from the cd argument
- String sliderClusterDir = serviceArgs.getSliderClusterURI();
- URI sliderClusterURI = new URI(sliderClusterDir);
- Path clusterDirPath = new Path(sliderClusterURI);
- log.info("Application defined at {}", sliderClusterURI);
- SliderFileSystem fs = getClusterFS();
-
- // build up information about the running application -this
- // will be passed down to the cluster status
- MapOperations appInformation = new MapOperations();
-
- AggregateConf instanceDefinition =
- InstanceIO.loadInstanceDefinitionUnresolved(fs, clusterDirPath);
- instanceDefinition.setName(clustername);
-
- log.info("Deploying cluster {}:", instanceDefinition);
-
- // and resolve it
- AggregateConf resolvedInstance = new AggregateConf( instanceDefinition);
- resolvedInstance.resolve();
-
- stateForProviders.setApplicationName(clustername);
-
- Configuration serviceConf = getConfig();
-
- // extend AM configuration with component resource
- MapOperations amConfiguration = resolvedInstance
- .getAppConfOperations().getComponent(COMPONENT_AM);
- // and patch configuration with prefix
- if (amConfiguration != null) {
- Map<String, String> sliderAppConfKeys = amConfiguration.prefixedWith("slider.");
- for (Map.Entry<String, String> entry : sliderAppConfKeys.entrySet()) {
- String k = entry.getKey();
- String v = entry.getValue();
- boolean exists = serviceConf.get(k) != null;
- log.info("{} {} to {}", (exists ? "Overwriting" : "Setting"), k, v);
- serviceConf.set(k, v);
- }
- }
-
- securityConfiguration = new SecurityConfiguration(serviceConf, resolvedInstance, clustername);
- // obtain security state
- securityEnabled = securityConfiguration.isSecurityEnabled();
- // set the global security flag for the instance definition
- instanceDefinition.getAppConfOperations().set(KEY_SECURITY_ENABLED, securityEnabled);
-
- // triggers resolution and snapshotting for agent
- appState.setInitialInstanceDefinition(instanceDefinition);
-
- File confDir = getLocalConfDir();
- if (!confDir.exists() || !confDir.isDirectory()) {
- log.info("Conf dir {} does not exist.", confDir);
- File parentFile = confDir.getParentFile();
- log.info("Parent dir {}:\n{}", parentFile, SliderUtils.listDir(parentFile));
- }
-
- //get our provider
- MapOperations globalInternalOptions = getGlobalInternalOptions();
- String providerType = globalInternalOptions.getMandatoryOption(
- InternalKeys.INTERNAL_PROVIDER_NAME);
- log.info("Cluster provider type is {}", providerType);
- SliderProviderFactory factory =
- SliderProviderFactory.createSliderProviderFactory(providerType);
- providerService = factory.createServerProvider();
- // init the provider BUT DO NOT START IT YET
- initAndAddService(providerService);
- providerRMOperationHandler = new ProviderNotifyingOperationHandler(providerService);
-
- // create a slider AM provider
- sliderAMProvider = new SliderAMProviderService();
- initAndAddService(sliderAMProvider);
-
- InetSocketAddress rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(serviceConf);
- log.info("RM is at {}", rmSchedulerAddress);
- yarnRPC = YarnRPC.create(serviceConf);
-
- // set up the YARN client. This may require patching in the RM client-API address if it
- // is (somehow) unset server-side. String clientRMaddr = serviceConf.get(YarnConfiguration.RM_ADDRESS);
- InetSocketAddress clientRpcAddress = SliderUtils.getRmAddress(serviceConf);
- if (!SliderUtils.isAddressDefined(clientRpcAddress)) {
- // client addr is being unset. We can lift it from the other RM APIs
- log.warn("Yarn RM address was unbound; attempting to fix up");
- serviceConf.set(YarnConfiguration.RM_ADDRESS,
- String.format("%s:%d", rmSchedulerAddress.getHostString(), clientRpcAddress.getPort() ));
- }
-
- /*
- * Extract the container ID. This is then
- * turned into an (incomplete) container
- */
- appMasterContainerID = ConverterUtils.toContainerId(
- SliderUtils.mandatoryEnvVariable(ApplicationConstants.Environment.CONTAINER_ID.name()));
- appAttemptID = appMasterContainerID.getApplicationAttemptId();
-
- ApplicationId appid = appAttemptID.getApplicationId();
- log.info("AM for ID {}", appid.getId());
-
- appInformation.put(StatusKeys.INFO_AM_CONTAINER_ID, appMasterContainerID.toString());
- appInformation.put(StatusKeys.INFO_AM_APP_ID, appid.toString());
- appInformation.put(StatusKeys.INFO_AM_ATTEMPT_ID, appAttemptID.toString());
-
- Map<String, String> envVars;
- List<Container> liveContainers;
-
- /*
- * It is critical this section is synchronized, to stop async AM events
- * arriving while registering a restarting AM.
- */
- synchronized (appState) {
- int heartbeatInterval = HEARTBEAT_INTERVAL;
-
- // add the RM client -this brings the callbacks in
- asyncRMClient = AMRMClientAsync.createAMRMClientAsync(heartbeatInterval, this);
- addService(asyncRMClient);
- //now bring it up
- deployChildService(asyncRMClient);
-
-
- // nmclient relays callbacks back to this class
- nmClientAsync = new NMClientAsyncImpl("nmclient", this);
- deployChildService(nmClientAsync);
-
- // set up secret manager
- secretManager = new ClientToAMTokenSecretManager(appAttemptID, null);
-
- if (securityEnabled) {
- // fix up the ACLs if they are not set
- String acls = serviceConf.get(KEY_PROTOCOL_ACL);
- if (acls == null) {
- getConfig().set(KEY_PROTOCOL_ACL, "*");
- }
- }
-
- certificateManager = new CertificateManager();
-
- //bring up the Slider RPC service
- buildPortScanner(instanceDefinition);
- startSliderRPCServer(instanceDefinition);
-
- rpcServiceAddress = rpcService.getConnectAddress();
- appMasterHostname = rpcServiceAddress.getAddress().getCanonicalHostName();
- appMasterRpcPort = rpcServiceAddress.getPort();
- appMasterTrackingUrl = null;
- log.info("AM Server is listening at {}:{}", appMasterHostname, appMasterRpcPort);
- appInformation.put(StatusKeys.INFO_AM_HOSTNAME, appMasterHostname);
- appInformation.set(StatusKeys.INFO_AM_RPC_PORT, appMasterRpcPort);
-
- log.info("Starting Yarn registry");
- registryOperations = startRegistryOperationsService();
- log.info(registryOperations.toString());
-
- //build the role map
- List<ProviderRole> providerRoles = new ArrayList<>(providerService.getRoles());
- providerRoles.addAll(SliderAMClientProvider.ROLES);
-
- // Start up the WebApp and track the URL for it
- MapOperations component = instanceDefinition.getAppConfOperations()
- .getComponent(SliderKeys.COMPONENT_AM);
- certificateManager.initialize(component, appMasterHostname,
- appMasterContainerID.toString(),
- clustername);
- certificateManager.setPassphrase(instanceDefinition.getPassphrase());
-
- if (component.getOptionBool(
- AgentKeys.KEY_AGENT_TWO_WAY_SSL_ENABLED, false)) {
- uploadServerCertForLocalization(clustername, fs);
- }
-
- // Web service endpoints: initialize
- WebAppApiImpl webAppApi =
- new WebAppApiImpl(
- stateForProviders,
- providerService,
- certificateManager,
- registryOperations,
- metricsAndMonitoring,
- actionQueues,
- this,
- contentCache);
- initAMFilterOptions(serviceConf);
-
- // start the agent web app
- startAgentWebApp(appInformation, serviceConf, webAppApi);
- int webAppPort = deployWebApplication(webAppApi);
-
- String scheme = WebAppUtils.HTTP_PREFIX;
- appMasterTrackingUrl = scheme + appMasterHostname + ":" + webAppPort;
-
- appInformation.put(StatusKeys.INFO_AM_WEB_URL, appMasterTrackingUrl + "/");
- appInformation.set(StatusKeys.INFO_AM_WEB_PORT, webAppPort);
-
- // *****************************************************
- // Register self with ResourceManager
- // This will start heartbeating to the RM
- // address = SliderUtils.getRmSchedulerAddress(asyncRMClient.getConfig());
- // *****************************************************
- log.info("Connecting to RM at {}; AM tracking URL={}",
- appMasterRpcPort, appMasterTrackingUrl);
- amRegistrationData = asyncRMClient.registerApplicationMaster(appMasterHostname,
- appMasterRpcPort,
- appMasterTrackingUrl);
- maximumResourceCapability = amRegistrationData.getMaximumResourceCapability();
-
- int minMemory = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
- // validate scheduler vcores allocation setting
- int minCores = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
- DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
- int maxMemory = maximumResourceCapability.getMemory();
- int maxCores = maximumResourceCapability.getVirtualCores();
- appState.setContainerLimits(minMemory,maxMemory, minCores, maxCores );
-
- // build the handler for RM request/release operations; this uses
- // the max value as part of its lookup
- rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient, maximumResourceCapability);
-
- // set the RM-defined maximum cluster values
- appInformation.put(ResourceKeys.YARN_CORES, Integer.toString(maxCores));
- appInformation.put(ResourceKeys.YARN_MEMORY, Integer.toString(maxMemory));
-
- processAMCredentials(securityConfiguration);
-
- if (securityEnabled) {
- secretManager.setMasterKey(
- amRegistrationData.getClientToAMTokenMasterKey().array());
- applicationACLs = amRegistrationData.getApplicationACLs();
-
- //tell the server what the ACLs are
- rpcService.getServer().refreshServiceAcl(serviceConf,
- new SliderAMPolicyProvider());
- if (securityConfiguration.isKeytabProvided()) {
- // perform keytab based login to establish kerberos authenticated
- // principal. Can do so now since AM registration with RM above required
- // tokens associated to principal
- String principal = securityConfiguration.getPrincipal();
- File localKeytabFile = securityConfiguration.getKeytabFile(instanceDefinition);
- // Now log in...
- login(principal, localKeytabFile);
- // obtain new FS reference that should be kerberos based and different
- // than the previously cached reference
- fs = new SliderFileSystem(serviceConf);
- }
- }
-
- // YARN client.
- // Important: this is only valid at startup, and must be executed within
- // the right UGI context. Use with care.
- SliderYarnClientImpl yarnClient = null;
- List<NodeReport> nodeReports;
- try {
- yarnClient = new SliderYarnClientImpl();
- yarnClient.init(getConfig());
- yarnClient.start();
- nodeReports = getNodeReports(yarnClient);
- log.info("Yarn node report count: {}", nodeReports.size());
- // look up the application itself -this is needed to get the proxied
- // URL of the AM, for registering endpoints.
- // this call must be made after the AM has registered itself, obviously
- ApplicationAttemptReport report = getApplicationAttemptReport(yarnClient);
- appMasterProxiedUrl = report.getTrackingUrl();
- if (SliderUtils.isUnset(appMasterProxiedUrl)) {
- log.warn("Proxied URL is not set in application report");
- appMasterProxiedUrl = appMasterTrackingUrl;
- }
- } finally {
- // at this point yarnClient is no longer needed.
- // stop it immediately
- ServiceOperations.stop(yarnClient);
- yarnClient = null;
- }
-
- // extract container list
-
- liveContainers = amRegistrationData.getContainersFromPreviousAttempts();
-
- //now validate the installation
- Configuration providerConf =
- providerService.loadProviderConfigurationInformation(confDir);
-
- providerService.initializeApplicationConfiguration(instanceDefinition,
- fs, null);
-
- providerService.validateApplicationConfiguration(instanceDefinition,
- confDir,
- securityEnabled);
-
- //determine the location for the role history data
- Path historyDir = new Path(clusterDirPath, HISTORY_DIR_NAME);
-
- //build the instance
- AppStateBindingInfo binding = new AppStateBindingInfo();
- binding.instanceDefinition = instanceDefinition;
- binding.serviceConfig = serviceConf;
- binding.publishedProviderConf = providerConf;
- binding.roles = providerRoles;
- binding.fs = fs.getFileSystem();
- binding.historyPath = historyDir;
- binding.liveContainers = liveContainers;
- binding.applicationInfo = appInformation;
- binding.releaseSelector = providerService.createContainerReleaseSelector();
- binding.nodeReports = nodeReports;
- appState.buildInstance(binding);
-
- providerService.rebuildContainerDetails(liveContainers,
- instanceDefinition.getName(), appState.getRolePriorityMap());
-
- // add the AM to the list of nodes in the cluster
-
- appState.buildAppMasterNode(appMasterContainerID,
- appMasterHostname,
- webAppPort,
- appMasterHostname + ":" + webAppPort);
-
- // build up environment variables that the AM wants set in every container
- // irrespective of provider and role.
- envVars = new HashMap<>();
- if (hadoop_user_name != null) {
- envVars.put(HADOOP_USER_NAME, hadoop_user_name);
- }
- String debug_kerberos = System.getenv(HADOOP_JAAS_DEBUG);
- if (debug_kerberos != null) {
- envVars.put(HADOOP_JAAS_DEBUG, debug_kerberos);
- }
- }
- String rolesTmpSubdir = appMasterContainerID.toString() + "/roles";
-
- String amTmpDir = globalInternalOptions.getMandatoryOption(InternalKeys.INTERNAL_AM_TMP_DIR);
-
- Path tmpDirPath = new Path(amTmpDir);
- Path launcherTmpDirPath = new Path(tmpDirPath, rolesTmpSubdir);
- fs.getFileSystem().mkdirs(launcherTmpDirPath);
-
- //launcher service
- launchService = new RoleLaunchService(actionQueues,
- providerService,
- fs,
- new Path(getGeneratedConfDir()),
- envVars,
- launcherTmpDirPath);
-
- deployChildService(launchService);
-
- appState.noteAMLaunched();
-
-
- //Give the provider access to the state, and AM
- providerService.bind(stateForProviders, actionQueues, liveContainers);
- sliderAMProvider.bind(stateForProviders, actionQueues, liveContainers);
-
- // chaos monkey
- maybeStartMonkey();
-
- // setup token renewal and expiry handling for long lived apps
-// if (!securityConfiguration.isKeytabProvided() &&
-// SliderUtils.isHadoopClusterSecure(getConfig())) {
-// fsDelegationTokenManager = new FsDelegationTokenManager(actionQueues);
-// fsDelegationTokenManager.acquireDelegationToken(getConfig());
-// }
-
- // if not a secure cluster, extract the username -it will be
- // propagated to workers
- if (!UserGroupInformation.isSecurityEnabled()) {
- hadoop_user_name = System.getenv(HADOOP_USER_NAME);
- log.info(HADOOP_USER_NAME + "='{}'", hadoop_user_name);
- }
- service_user_name = RegistryUtils.currentUser();
- log.info("Registry service username ={}", service_user_name);
-
-
- // declare the cluster initialized
- log.info("Application Master Initialization Completed");
- initCompleted.set(true);
-
- scheduleFailureWindowResets(instanceDefinition.getResources());
- scheduleEscalation(instanceDefinition.getInternal());
-
- try {
- // schedule YARN Registry registration
- queue(new ActionRegisterServiceInstance(clustername, appid));
-
- // log the YARN and web UIs
- log.info("RM Webapp address {}",
- serviceConf.get(YarnConfiguration.RM_WEBAPP_ADDRESS));
- log.info("Slider webapp address {} proxied at {}",
- appMasterTrackingUrl, appMasterProxiedUrl);
-
- // Start the Slider AM provider
- sliderAMProvider.start();
-
- // launch the real provider; this is expected to trigger a callback that
- // starts the node review process
- launchProviderService(instanceDefinition, confDir);
-
- // start handling any scheduled events
-
- startQueueProcessing();
-
- //now block waiting to be told to exit the process
- waitForAMCompletionSignal();
- } catch(Exception e) {
- log.error("Exception : {}", e, e);
- // call the AM stop command as if it had been queued (but without
- // going via the queue, which may not have started
- onAMStop(new ActionStopSlider(e));
- }
- //shutdown time
- return finish();
- }
-
- /**
- * Get the YARN application Attempt report as the logged in user
- * @param yarnClient client to the RM
- * @return the application report
- * @throws YarnException
- * @throws IOException
- * @throws InterruptedException
- */
- private ApplicationAttemptReport getApplicationAttemptReport(
- final SliderYarnClientImpl yarnClient)
- throws YarnException, IOException, InterruptedException {
- Preconditions.checkNotNull(yarnClient, "Null Yarn client");
- ApplicationAttemptReport report;
- if (securityEnabled) {
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- report = ugi.doAs(new PrivilegedExceptionAction<ApplicationAttemptReport>() {
- @Override
- public ApplicationAttemptReport run() throws Exception {
- return yarnClient.getApplicationAttemptReport(appAttemptID);
- }
- });
- } else {
- report = yarnClient.getApplicationAttemptReport(appAttemptID);
- }
- return report;
- }
-
- /**
- * List the node reports: uses {@link SliderYarnClientImpl} as the login user
- * @param yarnClient client to the RM
- * @return the node reports
- * @throws IOException
- * @throws YarnException
- * @throws InterruptedException
- */
- private List<NodeReport> getNodeReports(final SliderYarnClientImpl yarnClient)
- throws IOException, YarnException, InterruptedException {
- Preconditions.checkNotNull(yarnClient, "Null Yarn client");
- List<NodeReport> nodeReports;
- if (securityEnabled) {
- nodeReports = UserGroupInformation.getLoginUser().doAs(
- new PrivilegedExceptionAction<List<NodeReport>>() {
- @Override
- public List<NodeReport> run() throws Exception {
- return yarnClient.getNodeReports(NodeState.RUNNING);
- }
- });
- } else {
- nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
- }
- log.info("Yarn node report count: {}", nodeReports.size());
- return nodeReports;
- }
-
- /**
- * Deploy the web application.
- * <p>
- * Creates and starts the web application, and adds a
- * <code>WebAppService</code> service under the AM, to ensure
- * a managed web application shutdown.
- * @param webAppApi web app API instance
- * @return port the web application is deployed on
- * @throws IOException general problems starting the webapp (network, etc)
- * @throws WebAppException other issues
- */
- private int deployWebApplication(WebAppApiImpl webAppApi)
- throws IOException, SliderException {
-
- try {
- webApp = new SliderAMWebApp(webAppApi);
- HttpConfig.Policy policy = HttpConfig.Policy.HTTP_ONLY;
- int port = getPortToRequest();
- log.info("Launching web application at port {} with policy {}", port, policy);
-
- WebApps.$for(SliderAMWebApp.BASE_PATH,
- WebAppApi.class,
- webAppApi,
- RestPaths.WS_CONTEXT)
- .withHttpPolicy(getConfig(), policy)
- .at("0.0.0.0", port, true)
- .inDevMode()
- .start(webApp);
-
- WebAppService<SliderAMWebApp> webAppService =
- new WebAppService<>("slider", webApp);
-
- deployChildService(webAppService);
- return webApp.port();
- } catch (WebAppException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException)e.getCause();
- } else {
- throw e;
- }
- }
- }
-
- /**
- * Process the initial user to obtain the set of user
- * supplied credentials (tokens were passed in by client).
- * Removes the AM/RM token.
- * If a keytab has been provided, also strip the HDFS delegation token.
- * @param securityConfig slider security config
- * @throws IOException
- */
- private void processAMCredentials(SecurityConfiguration securityConfig)
- throws IOException {
-
- List<Text> filteredTokens = new ArrayList<>(3);
- filteredTokens.add(AMRMTokenIdentifier.KIND_NAME);
- filteredTokens.add(TimelineDelegationTokenIdentifier.KIND_NAME);
-
- boolean keytabProvided = securityConfig.isKeytabProvided();
- log.info("Slider AM Security Mode: {}", keytabProvided ? "KEYTAB" : "TOKEN");
- if (keytabProvided) {
- filteredTokens.add(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
- }
- containerCredentials = CredentialUtils.filterTokens(
- UserGroupInformation.getCurrentUser().getCredentials(),
- filteredTokens);
- log.info(CredentialUtils.dumpTokens(containerCredentials, "\n"));
- }
-
- /**
- * Build up the port scanner. This may include setting a port range.
- */
- private void buildPortScanner(AggregateConf instanceDefinition)
- throws BadConfigException {
- portScanner = new PortScanner();
- String portRange = instanceDefinition.
- getAppConfOperations().getGlobalOptions().
- getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0");
- if (!"0".equals(portRange)) {
- portScanner.setPortRange(portRange);
- }
- }
-
- /**
- * Locate a port to request for a service such as RPC or web/REST.
- * This uses port range definitions in the <code>instanceDefinition</code>
- * to fix the port range \u2014if one is set.
- * <p>
- * The port returned is available at the time of the request; there are
- * no guarantees as to how long that situation will last.
- * @return the port to request.
- * @throws SliderException
- */
- private int getPortToRequest() throws SliderException, IOException {
- return portScanner.getAvailablePort();
- }
-
- private void uploadServerCertForLocalization(String clustername,
- SliderFileSystem fs)
- throws IOException {
- Path certsDir = fs.buildClusterSecurityDirPath(clustername);
- if (!fs.getFileSystem().exists(certsDir)) {
- fs.getFileSystem().mkdirs(certsDir,
- new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
- }
- Path destPath = new Path(certsDir, SliderKeys.CRT_FILE_NAME);
- if (!fs.getFileSystem().exists(destPath)) {
- fs.getFileSystem().copyFromLocalFile(
- new Path(CertificateManager.getServerCertficateFilePath().getAbsolutePath()),
- destPath);
- log.info("Uploaded server cert to localization path {}", destPath);
- }
-
- fs.getFileSystem().setPermission(destPath,
- new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE));
- }
-
- protected void login(String principal, File localKeytabFile)
- throws IOException, SliderException {
- log.info("Logging in as {} with keytab {}", principal, localKeytabFile);
- UserGroupInformation.loginUserFromKeytab(principal,
- localKeytabFile.getAbsolutePath());
- validateLoginUser(UserGroupInformation.getLoginUser());
- }
-
- /**
- * Ensure that the user is generated from a keytab and has no HDFS delegation
- * tokens.
- *
- * @param user user to validate
- * @throws SliderException
- */
- protected void validateLoginUser(UserGroupInformation user)
- throws SliderException {
- if (!user.isFromKeytab()) {
- log.error("User is not holding on a keytab in a secure deployment:" +
- " slider will fail as tokens expire");
- }
- Credentials credentials = user.getCredentials();
- Iterator<Token<? extends TokenIdentifier>> iter =
- credentials.getAllTokens().iterator();
- while (iter.hasNext()) {
- Token<? extends TokenIdentifier> token = iter.next();
- log.info("Token {}", token.getKind());
- if (token.getKind().equals(
- DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) {
- log.info("HDFS delegation token {}. Removing...", token);
- iter.remove();
- }
- }
- }
-
- /**
- * Set up and start the agent web application
- * @param appInformation application information
- * @param serviceConf service configuration
- * @param webAppApi web app API instance to bind to
- * @throws IOException
- */
- private void startAgentWebApp(MapOperations appInformation,
- Configuration serviceConf, WebAppApiImpl webAppApi) throws IOException, SliderException {
- URL[] urls = ((URLClassLoader) AgentWebApp.class.getClassLoader() ).getURLs();
- StringBuilder sb = new StringBuilder("AM classpath:");
- for (URL url : urls) {
- sb.append("\n").append(url.toString());
- }
- LOG_YARN.debug(sb.append("\n").toString());
- initAMFilterOptions(serviceConf);
-
-
- // Start up the agent web app and track the URL for it
- MapOperations appMasterConfig = getInstanceDefinition()
- .getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM);
- AgentWebApp agentWebApp = AgentWebApp.$for(AgentWebApp.BASE_PATH,
- webAppApi,
- RestPaths.AGENT_WS_CONTEXT)
- .withComponentConfig(appMasterConfig)
- .withPort(getPortToRequest())
- .withSecuredPort(getPortToRequest())
- .start();
- agentOpsUrl =
- "https://" + appMasterHostname + ":" + agentWebApp.getSecuredPort();
- agentStatusUrl =
- "https://" + appMasterHostname + ":" + agentWebApp.getPort();
- AgentService agentService =
- new AgentService("slider-agent", agentWebApp);
-
- agentService.init(serviceConf);
- agentService.start();
- addService(agentService);
-
- appInformation.put(StatusKeys.INFO_AM_AGENT_OPS_URL, agentOpsUrl + "/");
- appInformation.put(StatusKeys.INFO_AM_AGENT_STATUS_URL, agentStatusUrl + "/");
- appInformation.set(StatusKeys.INFO_AM_AGENT_STATUS_PORT,
- agentWebApp.getPort());
- appInformation.set(StatusKeys.INFO_AM_AGENT_OPS_PORT,
- agentWebApp.getSecuredPort());
- }
-
- /**
- * Set up the AM filter
- * @param serviceConf configuration to patch
- */
- private void initAMFilterOptions(Configuration serviceConf) {
- // IP filtering
- String amFilterName = AM_FILTER_NAME;
-
- // This is here until YARN supports proxy & redirect operations
- // on verbs other than GET, and is only supported for testing
- if (X_DEV_INSECURE_REQUIRED && serviceConf.getBoolean(X_DEV_INSECURE_WS,
- X_DEV_INSECURE_DEFAULT)) {
- log.warn("Insecure filter enabled: REST operations are unauthenticated");
- amFilterName = InsecureAmFilterInitializer.NAME;
- }
-
- serviceConf.set(HADOOP_HTTP_FILTER_INITIALIZERS, amFilterName);
- }
-
- /**
- * This registers the service instance and its external values
- * @param instanceName name of this instance
- * @param appId application ID
- * @throws IOException
- */
- public void registerServiceInstance(String instanceName,
- ApplicationId appId) throws IOException {
-
-
- // the registry is running, so register services
- URL amWebURI = new URL(appMasterProxiedUrl);
- URL agentOpsURI = new URL(agentOpsUrl);
- URL agentStatusURI = new URL(agentStatusUrl);
-
- //Give the provider restricted access to the state, registry
- setupInitialRegistryPaths();
- yarnRegistryOperations = new YarnRegistryViewForProviders(
- registryOperations,
- service_user_name,
- SliderKeys.APP_TYPE,
- instanceName,
- appAttemptID);
- providerService.bindToYarnRegistry(yarnRegistryOperations);
- sliderAMProvider.bindToYarnRegistry(yarnRegistryOperations);
-
- // Yarn registry
- ServiceRecord serviceRecord = new ServiceRecord();
- serviceRecord.set(YarnRegistryAttributes.YARN_ID, appId.toString());
- serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE,
- PersistencePolicies.APPLICATION);
- serviceRecord.description = "Slider Application Master";
-
- serviceRecord.addExternalEndpoint(
- RegistryTypeUtils.ipcEndpoint(
- CustomRegistryConstants.AM_IPC_PROTOCOL,
- rpcServiceAddress));
-
- // internal services
- sliderAMProvider.applyInitialRegistryDefinitions(amWebURI,
- agentOpsURI,
- agentStatusURI,
- serviceRecord);
-
- // provider service dynamic definitions.
- providerService.applyInitialRegistryDefinitions(amWebURI,
- agentOpsURI,
- agentStatusURI,
- serviceRecord);
-
- // set any provided attributes
- setProvidedServiceRecordAttributes(
- getInstanceDefinition().getAppConfOperations().getComponent(
- SliderKeys.COMPONENT_AM), serviceRecord);
-
- // register the service's entry
- log.info("Service Record \n{}", serviceRecord);
- yarnRegistryOperations.registerSelf(serviceRecord, true);
- log.info("Registered service under {}; absolute path {}",
- yarnRegistryOperations.getSelfRegistrationPath(),
- yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
-
- boolean isFirstAttempt = 1 == appAttemptID.getAttemptId();
- // delete the children in case there are any and this is an AM startup.
- // just to make sure everything underneath is purged
- if (isFirstAttempt) {
- yarnRegistryOperations.deleteChildren(
- yarnRegistryOperations.getSelfRegistrationPath(),
- true);
- }
- }
-
- /**
- * TODO: purge this once RM is doing the work
- * @throws IOException
- */
- protected void setupInitialRegistryPaths() throws IOException {
- if (registryOperations instanceof RMRegistryOperationsService) {
- RMRegistryOperationsService rmRegOperations =
- (RMRegistryOperationsService) registryOperations;
- rmRegOperations.initUserRegistryAsync(service_user_name);
- }
- }
-
- /**
- * Handler for {@link RegisterComponentInstance action}
- * Register/re-register an ephemeral container that is already in the app state
- * @param id the component
- * @param description component description
- * @param type component type
- * @return true if the component is registered
- */
- public boolean registerComponent(ContainerId id, String description,
- String type) throws IOException {
- RoleInstance instance = appState.getOwnedContainer(id);
- if (instance == null) {
- return false;
- }
- // this is where component registrations go
- log.info("Registering component {}", id);
- String cid = RegistryPathUtils.encodeYarnID(id.toString());
- ServiceRecord container = new ServiceRecord();
- container.set(YarnRegistryAttributes.YARN_ID, cid);
- container.description = description;
- container.set(YarnRegistryAttributes.YARN_PERSISTENCE,
- PersistencePolicies.CONTAINER);
- MapOperations compOps = getInstanceDefinition().getAppConfOperations().
- getComponent(type);
- setProvidedServiceRecordAttributes(compOps, container);
- try {
- yarnRegistryOperations.putComponent(cid, container);
- } catch (IOException e) {
- log.warn("Failed to register container {}/{}: {}",
- id, description, e, e);
- return false;
- }
- return true;
- }
-
- protected void setProvidedServiceRecordAttributes(MapOperations ops,
- ServiceRecord record) {
- String prefix = RoleKeys.SERVICE_RECORD_ATTRIBUTE_PREFIX;
- for (Map.Entry<String, String> entry : ops.entrySet()) {
- if (entry.getKey().startsWith(
- prefix)) {
- String key = entry.getKey().substring(
- prefix.length() + 1);
- record.set(key, entry.getValue().trim());
- }
- }
- }
-
- /**
- * Handler for {@link UnregisterComponentInstance}
- *
- * unregister a component. At the time this message is received,
- * the component may not have been registered
- * @param id the component
- */
- public void unregisterComponent(ContainerId id) {
- log.info("Unregistering component {}", id);
- if (yarnRegistryOperations == null) {
- log.warn("Processing unregister component event before initialization " +
- "completed; init flag ={}", initCompleted);
- return;
- }
- String cid = RegistryPathUtils.encodeYarnID(id.toString());
- try {
- yarnRegistryOperations.deleteComponent(cid);
- } catch (IOException e) {
- log.warn("Failed to delete container {} : {}", id, e, e);
- }
- }
-
- /**
- * looks for a specific case where a token file is provided as an environment
- * variable, yet the file is not there.
- *
- * This surfaced (once) in HBase, where its HDFS library was looking for this,
- * and somehow the token was missing. This is a check in the AM so that
- * if the problem re-occurs, the AM can fail with a more meaningful message.
- *
- */
- private void checkAndWarnForAuthTokenProblems() {
- String fileLocation =
- System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- if (fileLocation != null) {
- File tokenFile = new File(fileLocation);
- if (!tokenFile.exists()) {
- log.warn("Token file {} specified in {} not found", tokenFile,
- UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- }
- }
- }
-
- /**
- * Build the configuration directory passed in or of the target FS
- * @return the file
- */
- public File getLocalConfDir() {
- File confdir =
- new File(SliderKeys.PROPAGATED_CONF_DIR_NAME).getAbsoluteFile();
- return confdir;
- }
-
- /**
- * Get the path to the DFS configuration that is defined in the cluster specification
- * @return the generated configuration dir
- */
- public String getGeneratedConfDir() {
- return getGlobalInternalOptions().get(
- InternalKeys.INTERNAL_GENERATED_CONF_PATH);
- }
-
- /**
- * Get the global internal options for the AM
- * @return a map to access the internals
- */
- public MapOperations getGlobalInternalOptions() {
- return getInstanceDefinition()
- .getInternalOperations().
- getGlobalOptions();
- }
-
- /**
- * Get the filesystem of this cluster
- * @return the FS of the config
- */
- public SliderFileSystem getClusterFS() throws IOException {
- return new SliderFileSystem(getConfig());
- }
-
- /**
- * Get the AM log
- * @return the log of the AM
- */
- public static Logger getLog() {
- return log;
- }
-
- /**
- * Get the application state
- * @return the application state
- */
- public AppState getAppState() {
- return appState;
- }
-
- /**
- * Block until it is signalled that the AM is done
- */
- private void waitForAMCompletionSignal() {
- AMExecutionStateLock.lock();
- try {
- if (!amCompletionFlag.get()) {
- log.debug("blocking until signalled to terminate");
- isAMCompleted.awaitUninterruptibly();
- }
- } finally {
- AMExecutionStateLock.unlock();
- }
- }
-
- /**
- * Signal that the AM is complete .. queues it in a separate thread
- *
- * @param stopActionRequest request containing shutdown details
- */
- public synchronized void signalAMComplete(ActionStopSlider stopActionRequest) {
- // this is a queued action: schedule it through the queues
- schedule(stopActionRequest);
- }
-
- /**
- * Signal that the AM is complete
- *
- * @param stopActionRequest request containing shutdown details
- */
- public synchronized void onAMStop(ActionStopSlider stopActionRequest) {
-
- AMExecutionStateLock.lock();
- try {
- if (amCompletionFlag.compareAndSet(false, true)) {
- // first stop request received
- this.stopAction = stopActionRequest;
- isAMCompleted.signal();
- }
- } finally {
- AMExecutionStateLock.unlock();
- }
- }
-
-
- /**
- * trigger the YARN cluster termination process
- * @return the exit code
- * @throws Exception if the stop action contained an Exception which implements
- * ExitCodeProvider
- */
- private synchronized int finish() throws Exception {
- Preconditions.checkNotNull(stopAction, "null stop action");
- FinalApplicationStatus appStatus;
- log.info("Triggering shutdown of the AM: {}", stopAction);
-
- String appMessage = stopAction.getMessage();
- //stop the daemon & grab its exit code
- int exitCode = stopAction.getExitCode();
- Exception exception = stopAction.getEx();
-
- appStatus = stopAction.getFinalApplicationStatus();
- if (!spawnedProcessExitedBeforeShutdownTriggered) {
- //stopped the forked process but don't worry about its exit code
- int forkedExitCode = stopForkedProcess();
- log.debug("Stopped forked process: exit code={}", forkedExitCode);
- }
-
- // make sure the AM is actually registered. If not, there's no point
- // trying to unregister it
- if (amRegistrationData == null) {
- log.info("Application attempt not yet registered; skipping unregistration");
- if (exception != null) {
- throw exception;
- }
- return exitCode;
- }
-
- //stop any launches in progress
- launchService.stop();
-
- //now release all containers
- releaseAllContainers();
-
- // When the application completes, it should send a finish application
- // signal to the RM
- log.info("Application completed. Signalling finish to RM");
-
- try {
- log.info("Unregistering AM status={} message={}", appStatus, appMessage);
- asyncRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
- } catch (InvalidApplicationMasterRequestException e) {
- log.info("Application not found in YARN application list;" +
- " it may have been terminated/YARN shutdown in progress: {}", e, e);
- } catch (YarnException | IOException e) {
- log.info("Failed to unregister application: " + e, e);
- }
- if (exception != null) {
- throw exception;
- }
- return exitCode;
- }
-
- /**
- * Get diagnostics info about containers
- */
- private String getContainerDiagnosticInfo() {
-
- return appState.getContainerDiagnosticInfo();
- }
-
- public Object getProxy(Class protocol, InetSocketAddress addr) {
- return yarnRPC.getProxy(protocol, addr, getConfig());
- }
-
- /**
- * Start the slider RPC server
- */
- private void startSliderRPCServer(AggregateConf instanceDefinition)
- throws IOException, SliderException {
- verifyIPCAccess();
-
- sliderIPCService = new SliderIPCService(
- this,
- certificateManager,
- stateForProviders,
- actionQueues,
- metricsAndMonitoring,
- contentCache);
-
- deployChildService(sliderIPCService);
- SliderClusterProtocolPBImpl protobufRelay =
- new SliderClusterProtocolPBImpl(sliderIPCService);
- BlockingService blockingService = SliderClusterAPI.SliderClusterProtocolPB
- .newReflectiveBlockingService(
- protobufRelay);
-
- int port = getPortToRequest();
- InetSocketAddress rpcAddress = new InetSocketAddress("0.0.0.0", port);
- rpcService =
- new WorkflowRpcService("SliderRPC",
- RpcBinder.createProtobufServer(rpcAddress, getConfig(),
- secretManager,
- NUM_RPC_HANDLERS,
- blockingService,
- null));
- deployChildService(rpcService);
- }
-
- /**
- * verify that if the cluster is authed, the ACLs are set.
- * @throws BadConfigException if Authorization is set without any ACL
- */
- private void verifyIPCAccess() throws BadConfigException {
- boolean authorization = getConfig().getBoolean(
- CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
- false);
- String acls = getConfig().get(KEY_PROTOCOL_ACL);
- if (authorization && SliderUtils.isUnset(acls)) {
- throw new BadConfigException("Application has IPC authorization enabled in " +
- CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION +
- " but no ACLs in " + KEY_PROTOCOL_ACL);
- }
- }
-
-
-/* =================================================================== */
-/* AMRMClientAsync callbacks */
-/* =================================================================== */
-
- /**
- * Callback event when a container is allocated.
- *
- * The app state is updated with the allocation, and builds up a list
- * of assignments and RM operations. The assignments are
- * handed off into the pool of service launchers to asynchronously schedule
- * container launch operations.
- *
- * The operations are run in sequence; they are expected to be 0 or more
- * release operations (to handle over-allocations)
- *
- * @param allocatedContainers list of containers that are now ready to be
- * given work.
- */
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- @Override //AMRMClientAsync
- public void onContainersAllocated(List<Container> allocatedContainers) {
- LOG_YARN.info("onContainersAllocated({})", allocatedContainers.size());
- List<ContainerAssignment> assignments = new ArrayList<>();
- List<AbstractRMOperation> operations = new ArrayList<>();
-
- //app state makes all the decisions
- appState.onContainersAllocated(allocatedContainers, assignments, operations);
-
- //for each assignment: instantiate that role
- for (ContainerAssignment assignment : assignments) {
- try {
- launchService.launchRole(assignment, getInstanceDefinition(),
- buildContainerCredentials());
- } catch (IOException e) {
- // Can be caused by failure to renew credentials with the remote
- // service. If so, don't launch the application. Container is retained,
- // though YARN will take it away after a timeout.
- log.error("Failed to build credentials to launch container: {}", e, e);
-
- }
- }
-
- //for all the operations, exec them
- execute(operations);
- log.info("Diagnostics: {}", getContainerDiagnosticInfo());
- }
-
- @Override //AMRMClientAsync
- public synchronized void onContainersCompleted(List<ContainerStatus> completedContainers) {
- LOG_YARN.info("onContainersCompleted([{}]", completedContainers.size());
- for (ContainerStatus status : completedContainers) {
- ContainerId containerId = status.getContainerId();
- LOG_YARN.info("Container Completion for" +
- " containerID={}," +
- " state={}," +
- " exitStatus={}," +
- " diagnostics={}",
- containerId, status.getState(),
- status.getExitStatus(),
- status.getDiagnostics());
-
- // non complete containers should not be here
- assert (status.getState() == ContainerState.COMPLETE);
- AppState.NodeCompletionResult result = appState.onCompletedNode(status);
- if (result.containerFailed) {
- RoleInstance ri = result.roleInstance;
- log.error("Role instance {} failed ", ri);
- }
-
- // known nodes trigger notifications
- if(!result.unknownNode) {
- getProviderService().notifyContainerCompleted(containerId);
- queue(new UnregisterComponentInstance(containerId, 0,
- TimeUnit.MILLISECONDS));
- }
- }
-
- reviewRequestAndReleaseNodes("onContainersCompleted");
- }
-
- /**
- * Signal that containers are being upgraded. Containers specified with
- * --containers option and all containers of all roles specified with
- * --components option are merged and upgraded.
- *
- * @param upgradeContainersRequest
- * request containing upgrade details
- */
- public synchronized void onUpgradeContainers(
- ActionUpgradeContainers upgradeContainersRequest) throws IOException,
- SliderException {
- LOG_YARN.info("onUpgradeContainers({})",
- upgradeContainersRequest.getMessage());
- Set<String> containers = upgradeContainersRequest.getContainers() == null ? new HashSet<String>()
- : upgradeContainersRequest.getContainers();
- LOG_YARN.info(" Container list provided (total {}) : {}",
- containers.size(), containers);
- Set<String> components = upgradeContainersRequest.getComponents() == null ? new HashSet<String>()
- : upgradeContainersRequest.getComponents();
- LOG_YARN.info(" Component list provided (total {}) : {}",
- components.size(), components);
- // If components are specified as well, then grab all the containers of
- // each of the components (roles)
- if (CollectionUtils.isNotEmpty(components)) {
- Map<ContainerId, RoleInstance> liveContainers = appState.getLiveContainers();
- if (CollectionUtils.isNotEmpty(liveContainers.keySet())) {
- Map<String, Set<String>> roleContainerMap = prepareRoleContainerMap(liveContainers);
- for (String component : components) {
- Set<String> roleContainers = roleContainerMap.get(component);
- if (roleContainers != null) {
- containers.addAll(roleContainers);
- }
- }
- }
- }
- LOG_YARN.info("Final list of containers to be upgraded (total {}) : {}",
- containers.size(), containers);
- if (providerService instanceof AgentProviderService) {
- AgentProviderService agentProviderService = (AgentProviderService) providerService;
- agentProviderService.setInUpgradeMode(true);
- agentProviderService.addUpgradeContainers(containers);
- }
- }
-
- // create a reverse map of roles -> set of all live containers
- private Map<String, Set<String>> prepareRoleContainerMap(
- Map<ContainerId, RoleInstance> liveContainers) {
- // liveContainers is ensured to be not empty
- Map<String, Set<String>> roleContainerMap = new HashMap<>();
- for (Map.Entry<ContainerId, RoleInstance> liveContainer : liveContainers
- .entrySet()) {
- RoleInstance role = liveContainer.getValue();
- if (roleContainerMap.containsKey(role.role)) {
- roleContainerMap.get(role.role).add(liveContainer.getKey().toString());
- } else {
- Set<String> containers = new HashSet<String>();
- containers.add(liveContainer.getKey().toString());
- roleContainerMap.put(role.role, containers);
- }
- }
- return roleContainerMap;
- }
-
- /**
- * Implementation of cluster flexing.
- * It should be the only way that anything -even the AM itself on startup-
- * asks for nodes.
- * @param resources the resource tree
- * @throws SliderException slider problems, including invalid configs
- * @throws IOException IO problems
- */
- public void flexCluster(ConfTree resources)
- throws IOException, SliderException {
-
- AggregateConf newConf =
- new AggregateConf(appState.getInstanceDefinitionSnapshot());
- newConf.setResources(resources);
- // verify the new definition is valid
- sliderAMProvider.validateInstanceDefinition(newConf);
- providerService.validateInstanceDefinition(newConf);
-
- appState.updateResourceDefinitions(resources);
-
- // reset the scheduled windows...the values
- // may have changed
- appState.resetFailureCounts();
-
- // ask for more containers if needed
- reviewRequestAndReleaseNodes("flexCluster");
- }
-
- /**
- * Schedule the failure window
- * @param resources the resource tree
- * @throws BadConfigException if the window is out of range
- */
- private void scheduleFailureWindowResets(ConfTree resources) throws
- BadConfigException {
- ResetFailureWindow reset = new ResetFailureWindow();
- ConfTreeOperations ops = new ConfTreeOperations(resources);
- MapOperations globals = ops.getGlobalOptions();
- long seconds = globals.getTimeRange(ResourceKeys.CONTAINER_FAILURE_WINDOW,
- ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS,
- ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS,
- ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES, 0);
- if (seconds > 0) {
- log.info(
- "Scheduling the failure window reset interval to every {} seconds",
- seconds);
- RenewingAction<ResetFailureWindow> renew = new RenewingAction<>(
- reset, seconds, seconds, TimeUnit.SECONDS, 0);
- actionQueues.renewing("failures", renew);
- } else {
- log.info("Failure window reset interval is not set");
- }
- }
-
- /**
- * Schedule the escalation action
- * @param internal
- * @throws BadConfigException
- */
- private void scheduleEscalation(ConfTree internal) throws BadConfigException {
- EscalateOutstandingRequests escalate = new EscalateOutstandingRequests();
- ConfTreeOperations ops = new ConfTreeOperations(internal);
- int seconds = ops.getGlobalOptions().getOptionInt(InternalKeys.ESCALATION_CHECK_INTERVAL,
- InternalKeys.DEFAULT_ESCALATION_CHECK_INTERVAL);
- RenewingAction<EscalateOutstandingRequests> renew = new RenewingAction<>(
- escalate, seconds, seconds, TimeUnit.SECONDS, 0);
- actionQueues.renewing("escalation", renew);
- }
-
- /**
- * Look at where the current node state is -and whether it should be changed
- * @param reason reason for operation
- */
- private synchronized void reviewRequestAndReleaseNodes(String reason) {
- log.debug("reviewRequestAndReleaseNodes({})", reason);
- queue(new ReviewAndFlexApplicationSize(reason, 0, TimeUnit.SECONDS));
- }
-
- /**
- * Handle the event requesting a review ... look at the queue and decide
- * whether to act or not
- * @param action action triggering the event. It may be put
- * back into the queue
- * @throws SliderInternalStateException
- */
- public void handleReviewAndFlexApplicationSize(ReviewAndFlexApplicationSize action)
- throws SliderInternalStateException {
-
- if ( actionQueues.hasQueuedActionWithAttribute(
- AsyncAction.ATTR_REVIEWS_APP_SIZE | AsyncAction.ATTR_HALTS_APP)) {
- // this operation isn't needed at all -existing duplicate or shutdown due
- return;
- }
- // if there is an action which changes cluster size, wait
- if (actionQueues.hasQueuedActionWithAttribute(
- AsyncAction.ATTR_CHANGES_APP_SIZE)) {
- // place the action at the back of the queue
- actionQueues.put(action);
- }
-
- executeNodeReview(action.name);
- }
-
- /**
- * Look at where the current node state is -and whether it should be changed
- */
- public synchronized void executeNodeReview(String reason)
- throws SliderInternalStateException {
-
- log.debug("in executeNodeReview({})", reason);
- if (amCompletionFlag.get()) {
- log.info("Ignoring node review operation: shutdown in progress");
- }
- try {
- List<AbstractRMOperation> allOperations = appState.reviewRequestAndReleaseNodes();
- // tell the provider
- providerRMOperationHandler.execute(allOperations);
- //now apply the operations
- execute(allOperations);
- } catch (TriggerClusterTeardownException e) {
- //App state has decided that it is time to exit
- log.error("Cluster teardown triggered {}", e, e);
- queue(new ActionStopSlider(e));
- }
- }
-
- /**
- * Escalate operation as triggered by external timer.
- * <p>
- * Get the list of new operations off the AM, then executest them.
- */
- public void escalateOutstandingRequests() {
- List<AbstractRMOperation> operations = appState.escalateOutstandingRequests();
- providerRMOperationHandler.execute(operations);
- execute(operations);
- }
-
-
- /**
- * Shutdown operation: release all containers
- */
- private void releaseAllContainers() {
- if (providerService instanceof AgentProviderService) {
- log.info("Setting stopInitiated flag to true");
- AgentProviderService agentProviderService = (AgentProviderService) providerService;
- agentProviderService.setAppStopInitiated(true);
- }
- // Add the sleep here (before releasing containers) so that applications get
- // time to perform graceful shutdown
- try {
- long timeout = getContainerReleaseTimeout();
- if (timeout > 0) {
- Thread.sleep(timeout);
- }
- } catch (InterruptedException e) {
- log.info("Sleep for container release interrupted");
- } finally {
- List<AbstractRMOperation> operations = appState.releaseAllContainers();
- providerRMOperationHandler.execute(operations);
- // now apply the operations
- execute(operations);
- }
- }
-
- private long getContainerReleaseTimeout() {
- // Get container release timeout in millis or 0 if the property is not set.
- // If non-zero then add the agent heartbeat delay time, since it can take up
- // to that much time for agents to receive the stop command.
- int timeout = getInstanceDefinition().getAppConfOperations()
- .getGlobalOptions()
- .getOptionInt(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0);
- if (timeout > 0) {
- timeout += SliderKeys.APP_CONTAINER_HEARTBEAT_INTERVAL_SEC;
- }
- // convert to millis
- long timeoutInMillis = timeout * 1000l;
- log.info("Container release timeout in millis = {}", timeoutInMillis);
- return timeoutInMillis;
- }
-
- /**
- * RM wants to shut down the AM
- */
- @Override //AMRMClientAsync
- public void onShutdownRequest() {
- LOG_YARN.info("Shutdown Request received");
- signalAMComplete(new ActionStopSlider("stop",
- EXIT_SUCCESS,
- FinalApplicationStatus.SUCCEEDED,
- "Shutdown requested from RM"));
- }
-
- /**
- * Monitored nodes have been changed
- * @param updatedNodes list of updated nodes
- */
- @Override //AMRMClientAsync
- public void onNodesUpdated(List<NodeReport> updatedNodes) {
- LOG_YARN.info("onNodesUpdated({})", updatedNodes.size());
- log.info("Updated nodes {}", updatedNodes);
- // Check if any nodes are lost or revived and update state accordingly
-
- AppState.NodeUpdatedOutcome outcome = appState.onNodesUpdated(updatedNodes);
- if (!outcome.operations.isEmpty()) {
- execute(outcome.operations);
- }
- // trigger a review if the cluster changed
- if (outcome.clusterChanged) {
- reviewRequestAndReleaseNodes("nodes updated");
- }
- }
-
- /**
- * heartbeat operation; return the ratio of requested
- * to actual
- * @return progress
- */
- @Override //AMRMClientAsync
- public float getProgress() {
- return appState.getApplicationProgressPercentage();
- }
-
- @Override //AMRMClientAsync
- public void onError(Throwable e) {
- //callback says it's time to finish
- LOG_YARN.error("AMRMClientAsync.onError() received {}", e, e);
- signalAMComplete(new ActionStopSlider("stop",
- EXIT_EXCEPTION_THROWN,
- FinalApplicationStatus.FAILED,
- "AMRMClientAsync.onError() received " + e));
- }
-
-/* =================================================================== */
-/* RMOperationHandlerActions */
-/* =================================================================== */
-
-
- @Override
- public void execute(List<AbstractRMOperation> operations) {
- rmOperationHandler.execute(operations);
- }
-
- @Override
- public void releaseAssignedContainer(ContainerId containerId) {
- rmOperationHandler.releaseAssignedContainer(containerId);
- }
-
- @Override
- public void addContainerRequest(AMRMClient.ContainerRequest req) {
- rmOperationHandler.addContainerRequest(req);
- }
-
- @Override
- public int cancelContainerRequests(Priority priority1,
- Priority priority2,
- int count) {
- return rmOperationHandler.cancelContainerRequests(priority1, priority2, count);
- }
-
- @Override
- public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
- rmOperationHandler.cancelSingleRequest(request);
- }
-
-/* =================================================================== */
-/* END */
-/* =================================================================== */
-
- /**
- * Launch the provider service
- *
- * @param instanceDefinition definition of the service
- * @param confDir directory of config data
- * @throws IOException
- * @throws SliderException
- */
- protected synchronized void launchProviderService(AggregateConf instanceDefinition,
- File confDir)
- throws IOException, SliderException {
- Map<String, String> env = new HashMap<>();
- boolean execStarted = providerService.exec(instanceDefinition, confDir, env,
- this);
- if (execStarted) {
- providerService.registerServiceListener(this);
- providerService.start();
- } else {
- // didn't start, so don't register
- providerService.start();
- // and send the started event ourselves
- eventCallbackEvent(null);
- }
- }
-
- /* =================================================================== */
- /* EventCallback from the child or ourselves directly */
- /* =================================================================== */
-
- @Override // ProviderCompleted
- public void eventCallbackEvent(Object parameter) {
- // signalled that the child process is up.
- appState.noteAMLive();
- // now ask for the cluster nodes
- try {
- flexCluster(getInstanceDefinition().getResources());
- } catch (Exception e) {
- // cluster flex failure: log
- log.error("Failed to flex cluster nodes: {}", e, e);
- // then what? exit
- queue(new ActionStopSlider(e));
- }
- }
-
- /**
- * report container loss. If this isn't already known about, react
- *
- * @param containerId id of the container which has failed
- * @throws SliderException
- */
- public synchronized void providerLostContainer(
- ContainerId containerId)
- throws SliderException {
- log.info("containerLostContactWithProvider: container {} lost",
- containerId);
- RoleInstance activeContainer = appState.getOwnedContainer(containerId);
- if (activeContainer != null) {
- execute(appState.releaseContainer(containerId));
- // ask for more containers if needed
- log.info("Container released; triggering review");
- reviewRequestAndReleaseNodes("Loss of container");
- } else {
- log.info("Container not in active set - ignoring");
- }
- }
-
- /* =================================================================== */
- /* ServiceStateChangeListener */
- /* =================================================================== */
-
- /**
- * Received on listening service termination.
- * @param service the service that has changed.
- */
- @Override //ServiceStateChangeListener
- public void stateChanged(Service service) {
- if (service == providerService && service.isInState(STATE.STOPPED)) {
- //its the current master process in play
- int exitCode = providerService.getExitCode();
- int mappedProcessExitCode = exitCode;
-
- boolean shouldTriggerFailure = !amCompletionFlag.get()
- && (mappedProcessExitCode != 0);
-
- if (shouldTriggerFailure) {
- String reason =
- "Spawned process failed with raw " + exitCode + " mapped to " +
- mappedProcessExitCode;
- ActionStopSlider stop = new ActionStopSlider("stop",
- mappedProcessExitCode,
- FinalApplicationStatus.FAILED,
- reason);
- //this wasn't expected: the process finished early
- spawnedProcessExitedBeforeShutdownTriggered = true;
- log.info(
- "Process has exited with exit code {} mapped to {} -triggering termination",
- exitCode,
- mappedProcessExitCode);
-
- //tell the AM the cluster is complete
- signalAMComplete(stop);
- } else {
- //we don't care
- log.info(
- "Process has exited with exit code {} mapped to {} -ignoring",
- exitCode,
- mappedProcessExitCode);
- }
- } else {
- super.stateChanged(service);
- }
- }
-
- /**
- * stop forked process if it the running process var is not null
- * @return the process exit code
- */
- protected synchronized Integer stopForkedProcess() {
- providerService.stop();
- return providerService.getExitCode();
- }
-
- /**
- * Async start container request
- * @param container container
- * @param ctx context
- * @param instance node details
- */
- public void startContainer(Container container,
- ContainerLaunchContext ctx,
- RoleInstance instance) throws IOException {
- appState.containerStartSubmitted(container, instance);
-
- nmClientAsync.startContainerAsync(container, ctx);
- }
-
- /**
- * Build the credentials needed for containers. This will include
- * getting new delegation tokens for HDFS if the AM is running
- * with a keytab.
- * @return a buffer of credentials
- * @throws IOException
- */
-
- private Credentials buildContainerCredentials() throws IOException {
- Credentials credentials = new Credentials(containerCredentials);
- if (securityConfiguration.isKeytabProvided()) {
- CredentialUtils.addSelfRenewableFSDelegationTokens(
- getClusterFS().getFileSystem(),
- credentials);
- }
- return credentials;
- }
-
- @Override // NMClientAsync.CallbackHandler
- public void onContainerStopped(ContainerId containerId) {
- // do nothing but log: container events from the AM
- // are the source of container halt details to react to
- log.info("onContainerStopped {} ", containerId);
- }
-
- @Override // NMClientAsync.CallbackHandler
- public void onContainerStarted(ContainerId containerId,
- Map<String, ByteBuffer> allServiceResponse) {
- LOG_YARN.info("Started Container {} ", containerId);
- RoleInstance cinfo = appState.onNodeManagerContainerStarted(containerId);
- if (cinfo != null) {
- LOG_YARN.info("Deployed instance of role {} onto {}",
- cinfo.role, containerId);
- //trigger an async container status
- nmClientAsync.getContainerStatusAsync(containerId,
- cinfo.container.getNodeId());
- // push out a registration
- queue(new RegisterComponentInstance(containerId, cinfo.role, cinfo.group,
- 0, TimeUnit.MILLISECONDS));
-
- } else {
- //this is a hypothetical path not seen. We react by warning
- log.error("Notified of started container that isn't pending {} - releasing",
- containerId);
- //then release it
- asyncRMClient.releaseAssignedContainer(containerId);
- }
- }
-
- @Override // NMClientAsync.CallbackHandler
- public void onStartContainerError(ContainerId containerId, Throwable t) {
-
<TRUNCATED>