You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/13 22:53:07 UTC
[15/74] [abbrv] hadoop git commit: YARN-5461. Initial code ported
from slider-core module. (jianhe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
new file mode 100644
index 0000000..b767059
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -0,0 +1,2450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.health.HealthCheckRegistry;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
+import static org.apache.slider.common.Constants.HADOOP_JAAS_DEBUG;
+
+import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.webapp.WebAppException;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.InternalKeys;
+import org.apache.slider.api.ResourceKeys;
+import org.apache.slider.api.RoleKeys;
+import org.apache.slider.api.StatusKeys;
+import org.apache.slider.api.proto.SliderClusterAPI;
+import org.apache.slider.client.SliderYarnClientImpl;
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.params.AbstractActionArgs;
+import org.apache.slider.common.params.SliderAMArgs;
+import org.apache.slider.common.params.SliderAMCreateAction;
+import org.apache.slider.common.params.SliderActions;
+import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.PortScanner;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.common.tools.SliderVersionInfo;
+import org.apache.slider.core.build.InstanceIO;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTree;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.exceptions.BadConfigException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.exceptions.SliderInternalStateException;
+import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
+import org.apache.slider.core.launch.CredentialUtils;
+import org.apache.slider.core.main.ExitCodeProvider;
+import org.apache.slider.core.main.LauncherExitCodes;
+import org.apache.slider.core.main.RunService;
+import org.apache.slider.core.main.ServiceLauncher;
+import org.apache.slider.core.registry.info.CustomRegistryConstants;
+import org.apache.slider.providers.ProviderCompleted;
+import org.apache.slider.providers.ProviderRole;
+import org.apache.slider.providers.ProviderService;
+import org.apache.slider.providers.SliderProviderFactory;
+import org.apache.slider.providers.agent.AgentKeys;
+import org.apache.slider.providers.agent.AgentProviderService;
+import org.apache.slider.providers.slideram.SliderAMClientProvider;
+import org.apache.slider.providers.slideram.SliderAMProviderService;
+import org.apache.slider.server.appmaster.actions.ActionRegisterServiceInstance;
+import org.apache.slider.server.appmaster.actions.EscalateOutstandingRequests;
+import org.apache.slider.server.appmaster.actions.RegisterComponentInstance;
+import org.apache.slider.server.appmaster.actions.QueueExecutor;
+import org.apache.slider.server.appmaster.actions.QueueService;
+import org.apache.slider.server.appmaster.actions.ActionStopSlider;
+import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers;
+import org.apache.slider.server.appmaster.actions.AsyncAction;
+import org.apache.slider.server.appmaster.actions.RenewingAction;
+import org.apache.slider.server.appmaster.actions.ResetFailureWindow;
+import org.apache.slider.server.appmaster.actions.ReviewAndFlexApplicationSize;
+import org.apache.slider.server.appmaster.actions.UnregisterComponentInstance;
+import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
+import org.apache.slider.server.appmaster.management.YarnServiceHealthCheck;
+import org.apache.slider.server.appmaster.monkey.ChaosKillAM;
+import org.apache.slider.server.appmaster.monkey.ChaosKillContainer;
+import org.apache.slider.server.appmaster.monkey.ChaosMonkeyService;
+import org.apache.slider.server.appmaster.operations.AsyncRMOperationHandler;
+import org.apache.slider.server.appmaster.operations.ProviderNotifyingOperationHandler;
+import org.apache.slider.server.appmaster.rpc.RpcBinder;
+import org.apache.slider.server.appmaster.rpc.SliderAMPolicyProvider;
+import org.apache.slider.server.appmaster.rpc.SliderClusterProtocolPBImpl;
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+import org.apache.slider.server.appmaster.rpc.SliderIPCService;
+import org.apache.slider.server.appmaster.security.SecurityConfiguration;
+import org.apache.slider.server.appmaster.state.AppState;
+import org.apache.slider.server.appmaster.state.AppStateBindingInfo;
+import org.apache.slider.server.appmaster.state.ContainerAssignment;
+import org.apache.slider.server.appmaster.state.ProviderAppState;
+import org.apache.slider.server.appmaster.operations.RMOperationHandler;
+import org.apache.slider.server.appmaster.state.RoleInstance;
+import org.apache.slider.server.appmaster.web.AgentService;
+import org.apache.slider.server.appmaster.web.rest.InsecureAmFilterInitializer;
+import org.apache.slider.server.appmaster.web.rest.agent.AgentWebApp;
+import org.apache.slider.server.appmaster.web.SliderAMWebApp;
+import org.apache.slider.server.appmaster.web.WebAppApi;
+import org.apache.slider.server.appmaster.web.WebAppApiImpl;
+import org.apache.slider.server.appmaster.web.rest.RestPaths;
+import org.apache.slider.server.appmaster.web.rest.application.ApplicationResouceContentCacheFactory;
+import org.apache.slider.server.appmaster.web.rest.application.resources.ContentCache;
+import org.apache.slider.server.services.security.CertificateManager;
+import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
+import org.apache.slider.server.services.utility.WebAppService;
+import org.apache.slider.server.services.workflow.ServiceThreadFactory;
+import org.apache.slider.server.services.workflow.WorkflowExecutorService;
+import org.apache.slider.server.services.workflow.WorkflowRpcService;
+import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This is the AM, which directly implements the callbacks from the AM and NM
+ */
+public class SliderAppMaster extends AbstractSliderLaunchedService
+ implements AMRMClientAsync.CallbackHandler,
+ NMClientAsync.CallbackHandler,
+ RunService,
+ SliderExitCodes,
+ SliderKeys,
+ ServiceStateChangeListener,
+ RoleKeys,
+ ProviderCompleted,
+ AppMasterActionOperations {
+
+ protected static final Logger log =
+ LoggerFactory.getLogger(SliderAppMaster.class);
+
+ /**
+ * log for YARN events
+ */
+ protected static final Logger LOG_YARN = log;
+
+ public static final String SERVICE_CLASSNAME_SHORT = "SliderAppMaster";
+ public static final String SERVICE_CLASSNAME =
+ "org.apache.slider.server.appmaster." + SERVICE_CLASSNAME_SHORT;
+
+ public static final int HEARTBEAT_INTERVAL = 1000;
+ public static final int NUM_RPC_HANDLERS = 5;
+
+ /**
+ * Metrics and monitoring services.
+ * Deployed in {@link #serviceInit(Configuration)}
+ */
+ private final MetricsAndMonitoring metricsAndMonitoring = new MetricsAndMonitoring();
+
+ /**
+ * metrics registry
+ */
+ public MetricRegistry metrics;
+
+ /** Error string on chaos monkey launch failure action: {@value} */
+ public static final String E_TRIGGERED_LAUNCH_FAILURE =
+ "Chaos monkey triggered launch failure";
+
+ /** YARN RPC to communicate with the Resource Manager or Node Manager */
+ private YarnRPC yarnRPC;
+
+ /** Handle to communicate with the Resource Manager*/
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private AMRMClientAsync asyncRMClient;
+
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private RMOperationHandler rmOperationHandler;
+
+ private RMOperationHandler providerRMOperationHandler;
+
+ /** Handle to communicate with the Node Manager*/
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ public NMClientAsync nmClientAsync;
+
+ /**
+ * Credentials for propagating down to launched containers
+ */
+ private Credentials containerCredentials;
+
+ /**
+ * Slider IPC: Real service handler
+ */
+ private SliderIPCService sliderIPCService;
+ /**
+ * Slider IPC: binding
+ */
+ private WorkflowRpcService rpcService;
+
+ /**
+ * Secret manager
+ */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private ClientToAMTokenSecretManager secretManager;
+
+ /** Hostname of the container*/
+ private String appMasterHostname = "";
+ /* Port on which the app master listens for status updates from clients*/
+ private int appMasterRpcPort = 0;
+ /** Tracking url to which app master publishes info for clients to monitor*/
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private String appMasterTrackingUrl = "";
+
+ /** Proxied app master URL (as retrieved from AM report at launch time) */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private String appMasterProxiedUrl = "";
+
+ /** Application Attempt Id ( combination of attemptId and fail count )*/
+ private ApplicationAttemptId appAttemptID;
+
+ /**
+ * App ACLs
+ */
+ protected Map<ApplicationAccessType, String> applicationACLs;
+
+ /**
+ * Ongoing state of the cluster: containers, nodes they
+ * live on, etc.
+ */
+ private final AppState appState =
+ new AppState(new ProtobufClusterServices(), metricsAndMonitoring);
+
+ /**
+ * App state for external objects. This is almost entirely
+ * a read-only view of the application state. To change the state,
+ * Providers (or anything else) are expected to queue async changes.
+ */
+ private final ProviderAppState stateForProviders =
+ new ProviderAppState("undefined", appState);
+
+ /**
+ * model the state using locks and conditions
+ */
+ private final ReentrantLock AMExecutionStateLock = new ReentrantLock();
+ private final Condition isAMCompleted = AMExecutionStateLock.newCondition();
+
+ /**
+ * Flag set if the AM is to be shutdown
+ */
+ private final AtomicBoolean amCompletionFlag = new AtomicBoolean(false);
+
+ /**
+ * Flag set during the init process
+ */
+ private final AtomicBoolean initCompleted = new AtomicBoolean(false);
+
+ /**
+ * Flag to set if the process exit code was set before shutdown started
+ */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private boolean spawnedProcessExitedBeforeShutdownTriggered;
+
+
+ /** Arguments passed in : raw*/
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private SliderAMArgs serviceArgs;
+
+ /**
+ * ID of the AM container
+ */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private ContainerId appMasterContainerID;
+
+ /**
+ * Monkey Service -may be null
+ */
+ private ChaosMonkeyService monkey;
+
+ /**
+ * ProviderService of this cluster
+ */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private ProviderService providerService;
+
+ /**
+ * The YARN registry service
+ */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private RegistryOperations registryOperations;
+
+ /**
+ * The stop request received...the exit details are extracted
+ * from this
+ */
+ private volatile ActionStopSlider stopAction;
+
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private RoleLaunchService launchService;
+
+ //username -null if it is not known/not to be set
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private String hadoop_user_name;
+ private String service_user_name;
+
+ private SliderAMWebApp webApp;
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private InetSocketAddress rpcServiceAddress;
+ private SliderAMProviderService sliderAMProvider;
+ private CertificateManager certificateManager;
+
+ /**
+ * Executor.
+ * Assigned in {@link #serviceInit(Configuration)}
+ */
+ private WorkflowExecutorService<ExecutorService> executorService;
+
+ /**
+ * Action queues. Created at instance creation, but
+ * added as a child and inited in {@link #serviceInit(Configuration)}
+ */
+ private final QueueService actionQueues = new QueueService();
+ private String agentOpsUrl;
+ private String agentStatusUrl;
+ private YarnRegistryViewForProviders yarnRegistryOperations;
+ //private FsDelegationTokenManager fsDelegationTokenManager;
+ private RegisterApplicationMasterResponse amRegistrationData;
+ private PortScanner portScanner;
+ private SecurityConfiguration securityConfiguration;
+
+ /**
+ * Is security enabled?
+ * Set early on in the {@link #createAndRunCluster(String)} operation.
+ */
+ private boolean securityEnabled;
+ private ContentCache contentCache;
+
+ /**
+ * resource limits
+ */
+ private Resource maximumResourceCapability;
+
+ /**
+ * Service Constructor
+ */
+ public SliderAppMaster() {
+ super(SERVICE_CLASSNAME_SHORT);
+ new HdfsConfiguration();
+ new YarnConfiguration();
+ }
+
+/* =================================================================== */
+/* service lifecycle methods */
+/* =================================================================== */
+
+ @Override //AbstractService
+ public synchronized void serviceInit(Configuration conf) throws Exception {
+ // slider client if found
+
+ Configuration customConf = SliderUtils.loadSliderClientXML();
+ // Load in the server configuration - if it is actually on the Classpath
+ URL serverXmlUrl = ConfigHelper.getResourceUrl(SLIDER_SERVER_XML);
+ if (serverXmlUrl != null) {
+ log.info("Loading {} at {}", SLIDER_SERVER_XML, serverXmlUrl);
+ Configuration serverConf = ConfigHelper.loadFromResource(SLIDER_SERVER_XML);
+ ConfigHelper.mergeConfigurations(customConf, serverConf,
+ SLIDER_SERVER_XML, true);
+ }
+ serviceArgs.applyDefinitions(customConf);
+ serviceArgs.applyFileSystemBinding(customConf);
+ // conf now contains all customizations
+
+ AbstractActionArgs action = serviceArgs.getCoreAction();
+ SliderAMCreateAction createAction = (SliderAMCreateAction) action;
+
+ // sort out the location of the AM
+ String rmAddress = createAction.getRmAddress();
+ if (rmAddress != null) {
+ log.debug("Setting RM address from the command line: {}", rmAddress);
+ SliderUtils.setRmSchedulerAddress(customConf, rmAddress);
+ }
+
+ log.info("AM configuration:\n{}",
+ ConfigHelper.dumpConfigToString(customConf));
+ for (Map.Entry<String, String> envs : System.getenv().entrySet()) {
+ log.info("System env {}={}", envs.getKey(), envs.getValue());
+ }
+
+ ConfigHelper.mergeConfigurations(conf, customConf, SLIDER_CLIENT_XML, true);
+ //init security with our conf
+ if (SliderUtils.isHadoopClusterSecure(conf)) {
+ log.info("Secure mode with kerberos realm {}",
+ SliderUtils.getKerberosRealm());
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ log.debug("Authenticating as {}", ugi);
+ SliderUtils.verifyPrincipalSet(conf, DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY);
+ } else {
+ log.info("Cluster is insecure");
+ }
+ log.info("Login user is {}", UserGroupInformation.getLoginUser());
+
+ //look at settings of Hadoop Auth, to pick up a problem seen once
+ checkAndWarnForAuthTokenProblems();
+
+ // validate server env
+ boolean dependencyChecks =
+ !conf.getBoolean(KEY_SLIDER_AM_DEPENDENCY_CHECKS_DISABLED,
+ false);
+ SliderUtils.validateSliderServerEnvironment(log, dependencyChecks);
+
+ // create and register monitoring services
+ addService(metricsAndMonitoring);
+ metrics = metricsAndMonitoring.getMetrics();
+/* TODO: turn these one once the metrics testing is more under control
+ metrics.registerAll(new ThreadStatesGaugeSet());
+ metrics.registerAll(new MemoryUsageGaugeSet());
+ metrics.registerAll(new GarbageCollectorMetricSet());
+
+*/
+ contentCache = ApplicationResouceContentCacheFactory.createContentCache(stateForProviders);
+
+ executorService = new WorkflowExecutorService<>("AmExecutor",
+ Executors.newFixedThreadPool(2,
+ new ServiceThreadFactory("AmExecutor", true)));
+ addService(executorService);
+
+ addService(actionQueues);
+
+ //init all child services
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ HealthCheckRegistry health = metricsAndMonitoring.getHealth();
+ health.register("AM Health", new YarnServiceHealthCheck(this));
+ }
+
+ /**
+ * Start the queue processing
+ */
+ private void startQueueProcessing() {
+ log.info("Queue Processing started");
+ executorService.execute(actionQueues);
+ executorService.execute(new QueueExecutor(this, actionQueues));
+ }
+
+/* =================================================================== */
+/* RunService methods called from ServiceLauncher */
+/* =================================================================== */
+
+ /**
+ * pick up the args from the service launcher
+ * @param config configuration
+ * @param args argument list
+ */
+ @Override // RunService
+ public Configuration bindArgs(Configuration config, String... args) throws Exception {
+ // let the superclass process it
+ Configuration superConf = super.bindArgs(config, args);
+ // add the slider XML config
+ ConfigHelper.injectSliderXMLResource();
+
+ //yarn-ify
+ YarnConfiguration yarnConfiguration = new YarnConfiguration(
+ superConf);
+ serviceArgs = new SliderAMArgs(args);
+ serviceArgs.parse();
+
+ return SliderUtils.patchConfiguration(yarnConfiguration);
+ }
+
+
+ /**
+ * this is called by service launcher; when it returns the application finishes
+ * @return the exit code to return by the app
+ * @throws Throwable
+ */
+ @Override
+ public int runService() throws Throwable {
+ SliderVersionInfo.loadAndPrintVersionInfo(log);
+
+ //dump the system properties if in debug mode
+ if (log.isDebugEnabled()) {
+ log.debug("System properties:\n" + SliderUtils.propertiesToString(System.getProperties()));
+ }
+
+ //choose the action
+ String action = serviceArgs.getAction();
+ List<String> actionArgs = serviceArgs.getActionArgs();
+ int exitCode;
+ switch (action) {
+ case SliderActions.ACTION_HELP:
+ log.info("{}: {}", getName(), serviceArgs.usage());
+ exitCode = SliderExitCodes.EXIT_USAGE;
+ break;
+ case SliderActions.ACTION_CREATE:
+ exitCode = createAndRunCluster(actionArgs.get(0));
+ break;
+ default:
+ throw new SliderException("Unimplemented: " + action);
+ }
+ log.info("Exiting AM; final exit code = {}", exitCode);
+ return exitCode;
+ }
+
+ /**
+ * Initialize a newly created service then add it.
+ * Because the service is not started, this MUST be done before
+ * the AM itself starts, or it is explicitly added after
+ * @param service the service to init
+ */
+ public Service initAndAddService(Service service) {
+ service.init(getConfig());
+ addService(service);
+ return service;
+ }
+
+ /* =================================================================== */
+
+ /**
+ * Create and run the cluster.
+ * @param clustername cluster name
+ * @return exit code
+ * @throws Throwable on a failure
+ */
+ private int createAndRunCluster(String clustername) throws Throwable {
+
+ //load the cluster description from the cd argument
+ String sliderClusterDir = serviceArgs.getSliderClusterURI();
+ URI sliderClusterURI = new URI(sliderClusterDir);
+ Path clusterDirPath = new Path(sliderClusterURI);
+ log.info("Application defined at {}", sliderClusterURI);
+ SliderFileSystem fs = getClusterFS();
+
+ // build up information about the running application -this
+ // will be passed down to the cluster status
+ MapOperations appInformation = new MapOperations();
+
+ AggregateConf instanceDefinition =
+ InstanceIO.loadInstanceDefinitionUnresolved(fs, clusterDirPath);
+ instanceDefinition.setName(clustername);
+
+ log.info("Deploying cluster {}:", instanceDefinition);
+
+ // and resolve it
+ AggregateConf resolvedInstance = new AggregateConf( instanceDefinition);
+ resolvedInstance.resolve();
+
+ stateForProviders.setApplicationName(clustername);
+
+ Configuration serviceConf = getConfig();
+
+ // extend AM configuration with component resource
+ MapOperations amConfiguration = resolvedInstance
+ .getAppConfOperations().getComponent(COMPONENT_AM);
+ // and patch configuration with prefix
+ if (amConfiguration != null) {
+ Map<String, String> sliderAppConfKeys = amConfiguration.prefixedWith("slider.");
+ for (Map.Entry<String, String> entry : sliderAppConfKeys.entrySet()) {
+ String k = entry.getKey();
+ String v = entry.getValue();
+ boolean exists = serviceConf.get(k) != null;
+ log.info("{} {} to {}", (exists ? "Overwriting" : "Setting"), k, v);
+ serviceConf.set(k, v);
+ }
+ }
+
+ securityConfiguration = new SecurityConfiguration(serviceConf, resolvedInstance, clustername);
+ // obtain security state
+ securityEnabled = securityConfiguration.isSecurityEnabled();
+ // set the global security flag for the instance definition
+ instanceDefinition.getAppConfOperations().set(KEY_SECURITY_ENABLED, securityEnabled);
+
+ // triggers resolution and snapshotting for agent
+ appState.setInitialInstanceDefinition(instanceDefinition);
+
+ File confDir = getLocalConfDir();
+ if (!confDir.exists() || !confDir.isDirectory()) {
+ log.info("Conf dir {} does not exist.", confDir);
+ File parentFile = confDir.getParentFile();
+ log.info("Parent dir {}:\n{}", parentFile, SliderUtils.listDir(parentFile));
+ }
+
+ //get our provider
+ MapOperations globalInternalOptions = getGlobalInternalOptions();
+ String providerType = globalInternalOptions.getMandatoryOption(
+ InternalKeys.INTERNAL_PROVIDER_NAME);
+ log.info("Cluster provider type is {}", providerType);
+ SliderProviderFactory factory =
+ SliderProviderFactory.createSliderProviderFactory(providerType);
+ providerService = factory.createServerProvider();
+ // init the provider BUT DO NOT START IT YET
+ initAndAddService(providerService);
+ providerRMOperationHandler = new ProviderNotifyingOperationHandler(providerService);
+
+ // create a slider AM provider
+ sliderAMProvider = new SliderAMProviderService();
+ initAndAddService(sliderAMProvider);
+
+ InetSocketAddress rmSchedulerAddress = SliderUtils.getRmSchedulerAddress(serviceConf);
+ log.info("RM is at {}", rmSchedulerAddress);
+ yarnRPC = YarnRPC.create(serviceConf);
+
+ // set up the YARN client. This may require patching in the RM client-API address if it
+ // is (somehow) unset server-side. String clientRMaddr = serviceConf.get(YarnConfiguration.RM_ADDRESS);
+ InetSocketAddress clientRpcAddress = SliderUtils.getRmAddress(serviceConf);
+ if (!SliderUtils.isAddressDefined(clientRpcAddress)) {
+ // client addr is being unset. We can lift it from the other RM APIs
+ log.warn("Yarn RM address was unbound; attempting to fix up");
+ serviceConf.set(YarnConfiguration.RM_ADDRESS,
+ String.format("%s:%d", rmSchedulerAddress.getHostString(), clientRpcAddress.getPort() ));
+ }
+
+ /*
+ * Extract the container ID. This is then
+ * turned into an (incomplete) container
+ */
+ appMasterContainerID = ConverterUtils.toContainerId(
+ SliderUtils.mandatoryEnvVariable(ApplicationConstants.Environment.CONTAINER_ID.name()));
+ appAttemptID = appMasterContainerID.getApplicationAttemptId();
+
+ ApplicationId appid = appAttemptID.getApplicationId();
+ log.info("AM for ID {}", appid.getId());
+
+ appInformation.put(StatusKeys.INFO_AM_CONTAINER_ID, appMasterContainerID.toString());
+ appInformation.put(StatusKeys.INFO_AM_APP_ID, appid.toString());
+ appInformation.put(StatusKeys.INFO_AM_ATTEMPT_ID, appAttemptID.toString());
+
+ Map<String, String> envVars;
+ List<Container> liveContainers;
+
+ /*
+ * It is critical this section is synchronized, to stop async AM events
+ * arriving while registering a restarting AM.
+ */
+ synchronized (appState) {
+ int heartbeatInterval = HEARTBEAT_INTERVAL;
+
+ // add the RM client -this brings the callbacks in
+ asyncRMClient = AMRMClientAsync.createAMRMClientAsync(heartbeatInterval, this);
+ addService(asyncRMClient);
+ //now bring it up
+ deployChildService(asyncRMClient);
+
+
+ // nmclient relays callbacks back to this class
+ nmClientAsync = new NMClientAsyncImpl("nmclient", this);
+ deployChildService(nmClientAsync);
+
+ // set up secret manager
+ secretManager = new ClientToAMTokenSecretManager(appAttemptID, null);
+
+ if (securityEnabled) {
+ // fix up the ACLs if they are not set
+ String acls = serviceConf.get(KEY_PROTOCOL_ACL);
+ if (acls == null) {
+ getConfig().set(KEY_PROTOCOL_ACL, "*");
+ }
+ }
+
+ certificateManager = new CertificateManager();
+
+ //bring up the Slider RPC service
+ buildPortScanner(instanceDefinition);
+ startSliderRPCServer(instanceDefinition);
+
+ rpcServiceAddress = rpcService.getConnectAddress();
+ appMasterHostname = rpcServiceAddress.getAddress().getCanonicalHostName();
+ appMasterRpcPort = rpcServiceAddress.getPort();
+ appMasterTrackingUrl = null;
+ log.info("AM Server is listening at {}:{}", appMasterHostname, appMasterRpcPort);
+ appInformation.put(StatusKeys.INFO_AM_HOSTNAME, appMasterHostname);
+ appInformation.set(StatusKeys.INFO_AM_RPC_PORT, appMasterRpcPort);
+
+ log.info("Starting Yarn registry");
+ registryOperations = startRegistryOperationsService();
+ log.info(registryOperations.toString());
+
+ //build the role map
+ List<ProviderRole> providerRoles = new ArrayList<>(providerService.getRoles());
+ providerRoles.addAll(SliderAMClientProvider.ROLES);
+
+ // Start up the WebApp and track the URL for it
+ MapOperations component = instanceDefinition.getAppConfOperations()
+ .getComponent(SliderKeys.COMPONENT_AM);
+ certificateManager.initialize(component, appMasterHostname,
+ appMasterContainerID.toString(),
+ clustername);
+ certificateManager.setPassphrase(instanceDefinition.getPassphrase());
+
+ if (component.getOptionBool(
+ AgentKeys.KEY_AGENT_TWO_WAY_SSL_ENABLED, false)) {
+ uploadServerCertForLocalization(clustername, fs);
+ }
+
+ // Web service endpoints: initialize
+ WebAppApiImpl webAppApi =
+ new WebAppApiImpl(
+ stateForProviders,
+ providerService,
+ certificateManager,
+ registryOperations,
+ metricsAndMonitoring,
+ actionQueues,
+ this,
+ contentCache);
+ initAMFilterOptions(serviceConf);
+
+ // start the agent web app
+ startAgentWebApp(appInformation, serviceConf, webAppApi);
+ int webAppPort = deployWebApplication(webAppApi);
+
+ String scheme = WebAppUtils.HTTP_PREFIX;
+ appMasterTrackingUrl = scheme + appMasterHostname + ":" + webAppPort;
+
+ appInformation.put(StatusKeys.INFO_AM_WEB_URL, appMasterTrackingUrl + "/");
+ appInformation.set(StatusKeys.INFO_AM_WEB_PORT, webAppPort);
+
+ // *****************************************************
+ // Register self with ResourceManager
+ // This will start heartbeating to the RM
+ // address = SliderUtils.getRmSchedulerAddress(asyncRMClient.getConfig());
+ // *****************************************************
+ log.info("Connecting to RM at {}; AM tracking URL={}",
+ appMasterRpcPort, appMasterTrackingUrl);
+ amRegistrationData = asyncRMClient.registerApplicationMaster(appMasterHostname,
+ appMasterRpcPort,
+ appMasterTrackingUrl);
+ maximumResourceCapability = amRegistrationData.getMaximumResourceCapability();
+
+ int minMemory = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ // validate scheduler vcores allocation setting
+ int minCores = serviceConf.getInt(RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ int maxMemory = maximumResourceCapability.getMemory();
+ int maxCores = maximumResourceCapability.getVirtualCores();
+ appState.setContainerLimits(minMemory,maxMemory, minCores, maxCores );
+
+ // build the handler for RM request/release operations; this uses
+ // the max value as part of its lookup
+ rmOperationHandler = new AsyncRMOperationHandler(asyncRMClient, maximumResourceCapability);
+
+ // set the RM-defined maximum cluster values
+ appInformation.put(ResourceKeys.YARN_CORES, Integer.toString(maxCores));
+ appInformation.put(ResourceKeys.YARN_MEMORY, Integer.toString(maxMemory));
+
+ processAMCredentials(securityConfiguration);
+
+ if (securityEnabled) {
+ secretManager.setMasterKey(
+ amRegistrationData.getClientToAMTokenMasterKey().array());
+ applicationACLs = amRegistrationData.getApplicationACLs();
+
+ //tell the server what the ACLs are
+ rpcService.getServer().refreshServiceAcl(serviceConf,
+ new SliderAMPolicyProvider());
+ if (securityConfiguration.isKeytabProvided()) {
+ // perform keytab based login to establish kerberos authenticated
+ // principal. Can do so now since AM registration with RM above required
+ // tokens associated to principal
+ String principal = securityConfiguration.getPrincipal();
+ File localKeytabFile = securityConfiguration.getKeytabFile(instanceDefinition);
+ // Now log in...
+ login(principal, localKeytabFile);
+ // obtain new FS reference that should be kerberos based and different
+ // than the previously cached reference
+ fs = new SliderFileSystem(serviceConf);
+ }
+ }
+
+ // YARN client.
+ // Important: this is only valid at startup, and must be executed within
+ // the right UGI context. Use with care.
+ SliderYarnClientImpl yarnClient = null;
+ List<NodeReport> nodeReports;
+ try {
+ yarnClient = new SliderYarnClientImpl();
+ yarnClient.init(getConfig());
+ yarnClient.start();
+ nodeReports = getNodeReports(yarnClient);
+ log.info("Yarn node report count: {}", nodeReports.size());
+ // look up the application itself -this is needed to get the proxied
+ // URL of the AM, for registering endpoints.
+ // this call must be made after the AM has registered itself, obviously
+ ApplicationAttemptReport report = getApplicationAttemptReport(yarnClient);
+ appMasterProxiedUrl = report.getTrackingUrl();
+ if (SliderUtils.isUnset(appMasterProxiedUrl)) {
+ log.warn("Proxied URL is not set in application report");
+ appMasterProxiedUrl = appMasterTrackingUrl;
+ }
+ } finally {
+ // at this point yarnClient is no longer needed.
+ // stop it immediately
+ ServiceOperations.stop(yarnClient);
+ yarnClient = null;
+ }
+
+ // extract container list
+
+ liveContainers = amRegistrationData.getContainersFromPreviousAttempts();
+
+ //now validate the installation
+ Configuration providerConf =
+ providerService.loadProviderConfigurationInformation(confDir);
+
+ providerService.initializeApplicationConfiguration(instanceDefinition, fs);
+
+ providerService.validateApplicationConfiguration(instanceDefinition,
+ confDir,
+ securityEnabled);
+
+ //determine the location for the role history data
+ Path historyDir = new Path(clusterDirPath, HISTORY_DIR_NAME);
+
+ //build the instance
+ AppStateBindingInfo binding = new AppStateBindingInfo();
+ binding.instanceDefinition = instanceDefinition;
+ binding.serviceConfig = serviceConf;
+ binding.publishedProviderConf = providerConf;
+ binding.roles = providerRoles;
+ binding.fs = fs.getFileSystem();
+ binding.historyPath = historyDir;
+ binding.liveContainers = liveContainers;
+ binding.applicationInfo = appInformation;
+ binding.releaseSelector = providerService.createContainerReleaseSelector();
+ binding.nodeReports = nodeReports;
+ appState.buildInstance(binding);
+
+ providerService.rebuildContainerDetails(liveContainers,
+ instanceDefinition.getName(), appState.getRolePriorityMap());
+
+ // add the AM to the list of nodes in the cluster
+
+ appState.buildAppMasterNode(appMasterContainerID,
+ appMasterHostname,
+ webAppPort,
+ appMasterHostname + ":" + webAppPort);
+
+ // build up environment variables that the AM wants set in every container
+ // irrespective of provider and role.
+ envVars = new HashMap<>();
+ if (hadoop_user_name != null) {
+ envVars.put(HADOOP_USER_NAME, hadoop_user_name);
+ }
+ String debug_kerberos = System.getenv(HADOOP_JAAS_DEBUG);
+ if (debug_kerberos != null) {
+ envVars.put(HADOOP_JAAS_DEBUG, debug_kerberos);
+ }
+ }
+ String rolesTmpSubdir = appMasterContainerID.toString() + "/roles";
+
+ String amTmpDir = globalInternalOptions.getMandatoryOption(InternalKeys.INTERNAL_AM_TMP_DIR);
+
+ Path tmpDirPath = new Path(amTmpDir);
+ Path launcherTmpDirPath = new Path(tmpDirPath, rolesTmpSubdir);
+ fs.getFileSystem().mkdirs(launcherTmpDirPath);
+
+ //launcher service
+ launchService = new RoleLaunchService(actionQueues,
+ providerService,
+ fs,
+ new Path(getGeneratedConfDir()),
+ envVars,
+ launcherTmpDirPath);
+
+ deployChildService(launchService);
+
+ appState.noteAMLaunched();
+
+
+ //Give the provider access to the state, and AM
+ providerService.bind(stateForProviders, actionQueues, liveContainers);
+ sliderAMProvider.bind(stateForProviders, actionQueues, liveContainers);
+
+ // chaos monkey
+ maybeStartMonkey();
+
+ // setup token renewal and expiry handling for long lived apps
+// if (!securityConfiguration.isKeytabProvided() &&
+// SliderUtils.isHadoopClusterSecure(getConfig())) {
+// fsDelegationTokenManager = new FsDelegationTokenManager(actionQueues);
+// fsDelegationTokenManager.acquireDelegationToken(getConfig());
+// }
+
+ // if not a secure cluster, extract the username -it will be
+ // propagated to workers
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ hadoop_user_name = System.getenv(HADOOP_USER_NAME);
+ log.info(HADOOP_USER_NAME + "='{}'", hadoop_user_name);
+ }
+ service_user_name = RegistryUtils.currentUser();
+ log.info("Registry service username ={}", service_user_name);
+
+
+ // declare the cluster initialized
+ log.info("Application Master Initialization Completed");
+ initCompleted.set(true);
+
+ scheduleFailureWindowResets(instanceDefinition.getResources());
+ scheduleEscalation(instanceDefinition.getInternal());
+
+ try {
+ // schedule YARN Registry registration
+ queue(new ActionRegisterServiceInstance(clustername, appid));
+
+ // log the YARN and web UIs
+ log.info("RM Webapp address {}",
+ serviceConf.get(YarnConfiguration.RM_WEBAPP_ADDRESS));
+ log.info("Slider webapp address {} proxied at {}",
+ appMasterTrackingUrl, appMasterProxiedUrl);
+
+ // Start the Slider AM provider
+ sliderAMProvider.start();
+
+ // launch the real provider; this is expected to trigger a callback that
+ // starts the node review process
+ launchProviderService(instanceDefinition, confDir);
+
+ // start handling any scheduled events
+
+ startQueueProcessing();
+
+ //now block waiting to be told to exit the process
+ waitForAMCompletionSignal();
+ } catch(Exception e) {
+ log.error("Exception : {}", e, e);
+ // call the AM stop command as if it had been queued (but without
+ // going via the queue, which may not have started
+ onAMStop(new ActionStopSlider(e));
+ }
+ //shutdown time
+ return finish();
+ }
+
+ /**
+ * Get the YARN application Attempt report as the logged in user
+ * @param yarnClient client to the RM
+ * @return the application report
+ * @throws YarnException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private ApplicationAttemptReport getApplicationAttemptReport(
+ final SliderYarnClientImpl yarnClient)
+ throws YarnException, IOException, InterruptedException {
+ Preconditions.checkNotNull(yarnClient, "Null Yarn client");
+ ApplicationAttemptReport report;
+ if (securityEnabled) {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ report = ugi.doAs(new PrivilegedExceptionAction<ApplicationAttemptReport>() {
+ @Override
+ public ApplicationAttemptReport run() throws Exception {
+ return yarnClient.getApplicationAttemptReport(appAttemptID);
+ }
+ });
+ } else {
+ report = yarnClient.getApplicationAttemptReport(appAttemptID);
+ }
+ return report;
+ }
+
+ /**
+ * List the node reports: uses {@link SliderYarnClientImpl} as the login user
+ * @param yarnClient client to the RM
+ * @return the node reports
+ * @throws IOException
+ * @throws YarnException
+ * @throws InterruptedException
+ */
+ private List<NodeReport> getNodeReports(final SliderYarnClientImpl yarnClient)
+ throws IOException, YarnException, InterruptedException {
+ Preconditions.checkNotNull(yarnClient, "Null Yarn client");
+ List<NodeReport> nodeReports;
+ if (securityEnabled) {
+ nodeReports = UserGroupInformation.getLoginUser().doAs(
+ new PrivilegedExceptionAction<List<NodeReport>>() {
+ @Override
+ public List<NodeReport> run() throws Exception {
+ return yarnClient.getNodeReports(NodeState.RUNNING);
+ }
+ });
+ } else {
+ nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
+ }
+ log.info("Yarn node report count: {}", nodeReports.size());
+ return nodeReports;
+ }
+
+ /**
+ * Deploy the web application.
+ * <p>
+ * Creates and starts the web application, and adds a
+ * <code>WebAppService</code> service under the AM, to ensure
+ * a managed web application shutdown.
+ * @param webAppApi web app API instance
+ * @return port the web application is deployed on
+ * @throws IOException general problems starting the webapp (network, etc)
+ * @throws WebAppException other issues
+ */
+ private int deployWebApplication(WebAppApiImpl webAppApi)
+ throws IOException, SliderException {
+
+ try {
+ webApp = new SliderAMWebApp(webAppApi);
+ HttpConfig.Policy policy = HttpConfig.Policy.HTTP_ONLY;
+ int port = getPortToRequest();
+ log.info("Launching web application at port {} with policy {}", port, policy);
+
+ WebApps.$for(SliderAMWebApp.BASE_PATH,
+ WebAppApi.class,
+ webAppApi,
+ RestPaths.WS_CONTEXT)
+ .withHttpPolicy(getConfig(), policy)
+ .at("0.0.0.0", port, true)
+ .inDevMode()
+ .start(webApp);
+
+ WebAppService<SliderAMWebApp> webAppService =
+ new WebAppService<>("slider", webApp);
+
+ deployChildService(webAppService);
+ return webApp.port();
+ } catch (WebAppException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException)e.getCause();
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Process the initial user to obtain the set of user
+ * supplied credentials (tokens were passed in by client).
+ * Removes the AM/RM token.
+ * If a keytab has been provided, also strip the HDFS delegation token.
+ * @param securityConfig slider security config
+ * @throws IOException
+ */
+ private void processAMCredentials(SecurityConfiguration securityConfig)
+ throws IOException {
+
+ List<Text> filteredTokens = new ArrayList<>(3);
+ filteredTokens.add(AMRMTokenIdentifier.KIND_NAME);
+ filteredTokens.add(TimelineDelegationTokenIdentifier.KIND_NAME);
+
+ boolean keytabProvided = securityConfig.isKeytabProvided();
+ log.info("Slider AM Security Mode: {}", keytabProvided ? "KEYTAB" : "TOKEN");
+ if (keytabProvided) {
+ filteredTokens.add(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+ }
+ containerCredentials = CredentialUtils.filterTokens(
+ UserGroupInformation.getCurrentUser().getCredentials(),
+ filteredTokens);
+ log.info(CredentialUtils.dumpTokens(containerCredentials, "\n"));
+ }
+
+ /**
+ * Build up the port scanner. This may include setting a port range.
+ */
+ private void buildPortScanner(AggregateConf instanceDefinition)
+ throws BadConfigException {
+ portScanner = new PortScanner();
+ String portRange = instanceDefinition.
+ getAppConfOperations().getGlobalOptions().
+ getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0");
+ if (!"0".equals(portRange)) {
+ portScanner.setPortRange(portRange);
+ }
+ }
+
+ /**
+ * Locate a port to request for a service such as RPC or web/REST.
+ * This uses port range definitions in the <code>instanceDefinition</code>
+ * to fix the port range \u2014if one is set.
+ * <p>
+ * The port returned is available at the time of the request; there are
+ * no guarantees as to how long that situation will last.
+ * @return the port to request.
+ * @throws SliderException
+ */
+ private int getPortToRequest() throws SliderException, IOException {
+ return portScanner.getAvailablePort();
+ }
+
+ private void uploadServerCertForLocalization(String clustername,
+ SliderFileSystem fs)
+ throws IOException {
+ Path certsDir = fs.buildClusterSecurityDirPath(clustername);
+ if (!fs.getFileSystem().exists(certsDir)) {
+ fs.getFileSystem().mkdirs(certsDir,
+ new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+ }
+ Path destPath = new Path(certsDir, SliderKeys.CRT_FILE_NAME);
+ if (!fs.getFileSystem().exists(destPath)) {
+ fs.getFileSystem().copyFromLocalFile(
+ new Path(CertificateManager.getServerCertficateFilePath().getAbsolutePath()),
+ destPath);
+ log.info("Uploaded server cert to localization path {}", destPath);
+ }
+
+ fs.getFileSystem().setPermission(destPath,
+ new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE));
+ }
+
+ protected void login(String principal, File localKeytabFile)
+ throws IOException, SliderException {
+ log.info("Logging in as {} with keytab {}", principal, localKeytabFile);
+ UserGroupInformation.loginUserFromKeytab(principal,
+ localKeytabFile.getAbsolutePath());
+ validateLoginUser(UserGroupInformation.getLoginUser());
+ }
+
+ /**
+ * Ensure that the user is generated from a keytab and has no HDFS delegation
+ * tokens.
+ *
+ * @param user user to validate
+ * @throws SliderException
+ */
+ protected void validateLoginUser(UserGroupInformation user)
+ throws SliderException {
+ if (!user.isFromKeytab()) {
+ log.error("User is not holding on a keytab in a secure deployment:" +
+ " slider will fail as tokens expire");
+ }
+ Credentials credentials = user.getCredentials();
+ Iterator<Token<? extends TokenIdentifier>> iter =
+ credentials.getAllTokens().iterator();
+ while (iter.hasNext()) {
+ Token<? extends TokenIdentifier> token = iter.next();
+ log.info("Token {}", token.getKind());
+ if (token.getKind().equals(
+ DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) {
+ log.info("HDFS delegation token {}. Removing...", token);
+ iter.remove();
+ }
+ }
+ }
+
+ /**
+ * Set up and start the agent web application
+ * @param appInformation application information
+ * @param serviceConf service configuration
+ * @param webAppApi web app API instance to bind to
+ * @throws IOException
+ */
+ private void startAgentWebApp(MapOperations appInformation,
+ Configuration serviceConf, WebAppApiImpl webAppApi) throws IOException, SliderException {
+ URL[] urls = ((URLClassLoader) AgentWebApp.class.getClassLoader() ).getURLs();
+ StringBuilder sb = new StringBuilder("AM classpath:");
+ for (URL url : urls) {
+ sb.append("\n").append(url.toString());
+ }
+ LOG_YARN.debug(sb.append("\n").toString());
+ initAMFilterOptions(serviceConf);
+
+
+ // Start up the agent web app and track the URL for it
+ MapOperations appMasterConfig = getInstanceDefinition()
+ .getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM);
+ AgentWebApp agentWebApp = AgentWebApp.$for(AgentWebApp.BASE_PATH,
+ webAppApi,
+ RestPaths.AGENT_WS_CONTEXT)
+ .withComponentConfig(appMasterConfig)
+ .withPort(getPortToRequest())
+ .withSecuredPort(getPortToRequest())
+ .start();
+ agentOpsUrl =
+ "https://" + appMasterHostname + ":" + agentWebApp.getSecuredPort();
+ agentStatusUrl =
+ "https://" + appMasterHostname + ":" + agentWebApp.getPort();
+ AgentService agentService =
+ new AgentService("slider-agent", agentWebApp);
+
+ agentService.init(serviceConf);
+ agentService.start();
+ addService(agentService);
+
+ appInformation.put(StatusKeys.INFO_AM_AGENT_OPS_URL, agentOpsUrl + "/");
+ appInformation.put(StatusKeys.INFO_AM_AGENT_STATUS_URL, agentStatusUrl + "/");
+ appInformation.set(StatusKeys.INFO_AM_AGENT_STATUS_PORT,
+ agentWebApp.getPort());
+ appInformation.set(StatusKeys.INFO_AM_AGENT_OPS_PORT,
+ agentWebApp.getSecuredPort());
+ }
+
+ /**
+ * Set up the AM filter
+ * @param serviceConf configuration to patch
+ */
+ private void initAMFilterOptions(Configuration serviceConf) {
+ // IP filtering
+ String amFilterName = AM_FILTER_NAME;
+
+ // This is here until YARN supports proxy & redirect operations
+ // on verbs other than GET, and is only supported for testing
+ if (X_DEV_INSECURE_REQUIRED && serviceConf.getBoolean(X_DEV_INSECURE_WS,
+ X_DEV_INSECURE_DEFAULT)) {
+ log.warn("Insecure filter enabled: REST operations are unauthenticated");
+ amFilterName = InsecureAmFilterInitializer.NAME;
+ }
+
+ serviceConf.set(HADOOP_HTTP_FILTER_INITIALIZERS, amFilterName);
+ }
+
+ /**
+ * This registers the service instance and its external values
+ * @param instanceName name of this instance
+ * @param appId application ID
+ * @throws IOException
+ */
+ public void registerServiceInstance(String instanceName,
+ ApplicationId appId) throws IOException {
+
+
+ // the registry is running, so register services
+ URL amWebURI = new URL(appMasterProxiedUrl);
+ URL agentOpsURI = new URL(agentOpsUrl);
+ URL agentStatusURI = new URL(agentStatusUrl);
+
+ //Give the provider restricted access to the state, registry
+ setupInitialRegistryPaths();
+ yarnRegistryOperations = new YarnRegistryViewForProviders(
+ registryOperations,
+ service_user_name,
+ SliderKeys.APP_TYPE,
+ instanceName,
+ appAttemptID);
+ providerService.bindToYarnRegistry(yarnRegistryOperations);
+ sliderAMProvider.bindToYarnRegistry(yarnRegistryOperations);
+
+ // Yarn registry
+ ServiceRecord serviceRecord = new ServiceRecord();
+ serviceRecord.set(YarnRegistryAttributes.YARN_ID, appId.toString());
+ serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE,
+ PersistencePolicies.APPLICATION);
+ serviceRecord.description = "Slider Application Master";
+
+ serviceRecord.addExternalEndpoint(
+ RegistryTypeUtils.ipcEndpoint(
+ CustomRegistryConstants.AM_IPC_PROTOCOL,
+ rpcServiceAddress));
+
+ // internal services
+ sliderAMProvider.applyInitialRegistryDefinitions(amWebURI,
+ agentOpsURI,
+ agentStatusURI,
+ serviceRecord);
+
+ // provider service dynamic definitions.
+ providerService.applyInitialRegistryDefinitions(amWebURI,
+ agentOpsURI,
+ agentStatusURI,
+ serviceRecord);
+
+ // set any provided attributes
+ setProvidedServiceRecordAttributes(
+ getInstanceDefinition().getAppConfOperations().getComponent(
+ SliderKeys.COMPONENT_AM), serviceRecord);
+
+ // register the service's entry
+ log.info("Service Record \n{}", serviceRecord);
+ yarnRegistryOperations.registerSelf(serviceRecord, true);
+ log.info("Registered service under {}; absolute path {}",
+ yarnRegistryOperations.getSelfRegistrationPath(),
+ yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
+
+ boolean isFirstAttempt = 1 == appAttemptID.getAttemptId();
+ // delete the children in case there are any and this is an AM startup.
+ // just to make sure everything underneath is purged
+ if (isFirstAttempt) {
+ yarnRegistryOperations.deleteChildren(
+ yarnRegistryOperations.getSelfRegistrationPath(),
+ true);
+ }
+ }
+
+ /**
+ * TODO: purge this once RM is doing the work
+ * @throws IOException
+ */
+ protected void setupInitialRegistryPaths() throws IOException {
+ if (registryOperations instanceof RMRegistryOperationsService) {
+ RMRegistryOperationsService rmRegOperations =
+ (RMRegistryOperationsService) registryOperations;
+ rmRegOperations.initUserRegistryAsync(service_user_name);
+ }
+ }
+
+ /**
+ * Handler for {@link RegisterComponentInstance action}
+ * Register/re-register an ephemeral container that is already in the app state
+ * @param id the component
+ * @param description component description
+ * @param type component type
+ * @return true if the component is registered
+ */
+ public boolean registerComponent(ContainerId id, String description,
+ String type) throws IOException {
+ RoleInstance instance = appState.getOwnedContainer(id);
+ if (instance == null) {
+ return false;
+ }
+ // this is where component registrations go
+ log.info("Registering component {}", id);
+ String cid = RegistryPathUtils.encodeYarnID(id.toString());
+ ServiceRecord container = new ServiceRecord();
+ container.set(YarnRegistryAttributes.YARN_ID, cid);
+ container.description = description;
+ container.set(YarnRegistryAttributes.YARN_PERSISTENCE,
+ PersistencePolicies.CONTAINER);
+ MapOperations compOps = getInstanceDefinition().getAppConfOperations().
+ getComponent(type);
+ setProvidedServiceRecordAttributes(compOps, container);
+ try {
+ yarnRegistryOperations.putComponent(cid, container);
+ } catch (IOException e) {
+ log.warn("Failed to register container {}/{}: {}",
+ id, description, e, e);
+ return false;
+ }
+ return true;
+ }
+
+ protected void setProvidedServiceRecordAttributes(MapOperations ops,
+ ServiceRecord record) {
+ String prefix = RoleKeys.SERVICE_RECORD_ATTRIBUTE_PREFIX;
+ for (Map.Entry<String, String> entry : ops.entrySet()) {
+ if (entry.getKey().startsWith(
+ prefix)) {
+ String key = entry.getKey().substring(
+ prefix.length() + 1);
+ record.set(key, entry.getValue().trim());
+ }
+ }
+ }
+
+ /**
+ * Handler for {@link UnregisterComponentInstance}
+ *
+ * unregister a component. At the time this message is received,
+ * the component may not have been registered
+ * @param id the component
+ */
+ public void unregisterComponent(ContainerId id) {
+ log.info("Unregistering component {}", id);
+ if (yarnRegistryOperations == null) {
+ log.warn("Processing unregister component event before initialization " +
+ "completed; init flag ={}", initCompleted);
+ return;
+ }
+ String cid = RegistryPathUtils.encodeYarnID(id.toString());
+ try {
+ yarnRegistryOperations.deleteComponent(cid);
+ } catch (IOException e) {
+ log.warn("Failed to delete container {} : {}", id, e, e);
+ }
+ }
+
+ /**
+ * looks for a specific case where a token file is provided as an environment
+ * variable, yet the file is not there.
+ *
+ * This surfaced (once) in HBase, where its HDFS library was looking for this,
+ * and somehow the token was missing. This is a check in the AM so that
+ * if the problem re-occurs, the AM can fail with a more meaningful message.
+ *
+ */
+ private void checkAndWarnForAuthTokenProblems() {
+ String fileLocation =
+ System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ if (fileLocation != null) {
+ File tokenFile = new File(fileLocation);
+ if (!tokenFile.exists()) {
+ log.warn("Token file {} specified in {} not found", tokenFile,
+ UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ }
+ }
+ }
+
+ /**
+ * Build the configuration directory passed in or of the target FS
+ * @return the file
+ */
+ public File getLocalConfDir() {
+ File confdir =
+ new File(SliderKeys.PROPAGATED_CONF_DIR_NAME).getAbsoluteFile();
+ return confdir;
+ }
+
+ /**
+ * Get the path to the DFS configuration that is defined in the cluster specification
+ * @return the generated configuration dir
+ */
+ public String getGeneratedConfDir() {
+ return getGlobalInternalOptions().get(
+ InternalKeys.INTERNAL_GENERATED_CONF_PATH);
+ }
+
+ /**
+ * Get the global internal options for the AM
+ * @return a map to access the internals
+ */
+ public MapOperations getGlobalInternalOptions() {
+ return getInstanceDefinition()
+ .getInternalOperations().
+ getGlobalOptions();
+ }
+
+ /**
+ * Get the filesystem of this cluster
+ * @return the FS of the config
+ */
+ public SliderFileSystem getClusterFS() throws IOException {
+ return new SliderFileSystem(getConfig());
+ }
+
+ /**
+ * Get the AM log
+ * @return the log of the AM
+ */
+ public static Logger getLog() {
+ return log;
+ }
+
+ /**
+ * Get the application state
+ * @return the application state
+ */
+ public AppState getAppState() {
+ return appState;
+ }
+
+ /**
+ * Block until it is signalled that the AM is done
+ */
+ private void waitForAMCompletionSignal() {
+ AMExecutionStateLock.lock();
+ try {
+ if (!amCompletionFlag.get()) {
+ log.debug("blocking until signalled to terminate");
+ isAMCompleted.awaitUninterruptibly();
+ }
+ } finally {
+ AMExecutionStateLock.unlock();
+ }
+ }
+
+ /**
+ * Signal that the AM is complete .. queues it in a separate thread
+ *
+ * @param stopActionRequest request containing shutdown details
+ */
+ public synchronized void signalAMComplete(ActionStopSlider stopActionRequest) {
+ // this is a queued action: schedule it through the queues
+ schedule(stopActionRequest);
+ }
+
+ /**
+ * Signal that the AM is complete
+ *
+ * @param stopActionRequest request containing shutdown details
+ */
+ public synchronized void onAMStop(ActionStopSlider stopActionRequest) {
+
+ AMExecutionStateLock.lock();
+ try {
+ if (amCompletionFlag.compareAndSet(false, true)) {
+ // first stop request received
+ this.stopAction = stopActionRequest;
+ isAMCompleted.signal();
+ }
+ } finally {
+ AMExecutionStateLock.unlock();
+ }
+ }
+
+
+ /**
+ * trigger the YARN cluster termination process
+ * @return the exit code
+ * @throws Exception if the stop action contained an Exception which implements
+ * ExitCodeProvider
+ */
+ private synchronized int finish() throws Exception {
+ Preconditions.checkNotNull(stopAction, "null stop action");
+ FinalApplicationStatus appStatus;
+ log.info("Triggering shutdown of the AM: {}", stopAction);
+
+ String appMessage = stopAction.getMessage();
+ //stop the daemon & grab its exit code
+ int exitCode = stopAction.getExitCode();
+ Exception exception = stopAction.getEx();
+
+ appStatus = stopAction.getFinalApplicationStatus();
+ if (!spawnedProcessExitedBeforeShutdownTriggered) {
+ //stopped the forked process but don't worry about its exit code
+ int forkedExitCode = stopForkedProcess();
+ log.debug("Stopped forked process: exit code={}", forkedExitCode);
+ }
+
+ // make sure the AM is actually registered. If not, there's no point
+ // trying to unregister it
+ if (amRegistrationData == null) {
+ log.info("Application attempt not yet registered; skipping unregistration");
+ if (exception != null) {
+ throw exception;
+ }
+ return exitCode;
+ }
+
+ //stop any launches in progress
+ launchService.stop();
+
+ //now release all containers
+ releaseAllContainers();
+
+ // When the application completes, it should send a finish application
+ // signal to the RM
+ log.info("Application completed. Signalling finish to RM");
+
+ try {
+ log.info("Unregistering AM status={} message={}", appStatus, appMessage);
+ asyncRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
+ } catch (InvalidApplicationMasterRequestException e) {
+ log.info("Application not found in YARN application list;" +
+ " it may have been terminated/YARN shutdown in progress: {}", e, e);
+ } catch (YarnException | IOException e) {
+ log.info("Failed to unregister application: " + e, e);
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ return exitCode;
+ }
+
+ /**
+ * Get diagnostics info about containers
+ */
+ private String getContainerDiagnosticInfo() {
+
+ return appState.getContainerDiagnosticInfo();
+ }
+
+ public Object getProxy(Class protocol, InetSocketAddress addr) {
+ return yarnRPC.getProxy(protocol, addr, getConfig());
+ }
+
+ /**
+ * Start the slider RPC server
+ */
+ private void startSliderRPCServer(AggregateConf instanceDefinition)
+ throws IOException, SliderException {
+ verifyIPCAccess();
+
+ sliderIPCService = new SliderIPCService(
+ this,
+ certificateManager,
+ stateForProviders,
+ actionQueues,
+ metricsAndMonitoring,
+ contentCache);
+
+ deployChildService(sliderIPCService);
+ SliderClusterProtocolPBImpl protobufRelay =
+ new SliderClusterProtocolPBImpl(sliderIPCService);
+ BlockingService blockingService = SliderClusterAPI.SliderClusterProtocolPB
+ .newReflectiveBlockingService(
+ protobufRelay);
+
+ int port = getPortToRequest();
+ InetSocketAddress rpcAddress = new InetSocketAddress("0.0.0.0", port);
+ rpcService =
+ new WorkflowRpcService("SliderRPC",
+ RpcBinder.createProtobufServer(rpcAddress, getConfig(),
+ secretManager,
+ NUM_RPC_HANDLERS,
+ blockingService,
+ null));
+ deployChildService(rpcService);
+ }
+
+ /**
+ * verify that if the cluster is authed, the ACLs are set.
+ * @throws BadConfigException if Authorization is set without any ACL
+ */
+ private void verifyIPCAccess() throws BadConfigException {
+ boolean authorization = getConfig().getBoolean(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+ false);
+ String acls = getConfig().get(KEY_PROTOCOL_ACL);
+ if (authorization && SliderUtils.isUnset(acls)) {
+ throw new BadConfigException("Application has IPC authorization enabled in " +
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION +
+ " but no ACLs in " + KEY_PROTOCOL_ACL);
+ }
+ }
+
+
+/* =================================================================== */
+/* AMRMClientAsync callbacks */
+/* =================================================================== */
+
+ /**
+ * Callback event when a container is allocated.
+ *
+ * The app state is updated with the allocation, and builds up a list
+ * of assignments and RM operations. The assignments are
+ * handed off into the pool of service launchers to asynchronously schedule
+ * container launch operations.
+ *
+ * The operations are run in sequence; they are expected to be 0 or more
+ * release operations (to handle over-allocations)
+ *
+ * @param allocatedContainers list of containers that are now ready to be
+ * given work.
+ */
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ @Override //AMRMClientAsync
+ public void onContainersAllocated(List<Container> allocatedContainers) {
+ LOG_YARN.info("onContainersAllocated({})", allocatedContainers.size());
+ List<ContainerAssignment> assignments = new ArrayList<>();
+ List<AbstractRMOperation> operations = new ArrayList<>();
+
+ //app state makes all the decisions
+ appState.onContainersAllocated(allocatedContainers, assignments, operations);
+
+ //for each assignment: instantiate that role
+ for (ContainerAssignment assignment : assignments) {
+ try {
+ launchService.launchRole(assignment, getInstanceDefinition(),
+ buildContainerCredentials());
+ } catch (IOException e) {
+ // Can be caused by failure to renew credentials with the remote
+ // service. If so, don't launch the application. Container is retained,
+ // though YARN will take it away after a timeout.
+ log.error("Failed to build credentials to launch container: {}", e, e);
+
+ }
+ }
+
+ //for all the operations, exec them
+ execute(operations);
+ log.info("Diagnostics: {}", getContainerDiagnosticInfo());
+ }
+
+ @Override //AMRMClientAsync
+ public synchronized void onContainersCompleted(List<ContainerStatus> completedContainers) {
+ LOG_YARN.info("onContainersCompleted([{}]", completedContainers.size());
+ for (ContainerStatus status : completedContainers) {
+ ContainerId containerId = status.getContainerId();
+ LOG_YARN.info("Container Completion for" +
+ " containerID={}," +
+ " state={}," +
+ " exitStatus={}," +
+ " diagnostics={}",
+ containerId, status.getState(),
+ status.getExitStatus(),
+ status.getDiagnostics());
+
+ // non complete containers should not be here
+ assert (status.getState() == ContainerState.COMPLETE);
+ AppState.NodeCompletionResult result = appState.onCompletedNode(status);
+ if (result.containerFailed) {
+ RoleInstance ri = result.roleInstance;
+ log.error("Role instance {} failed ", ri);
+ }
+
+ // known nodes trigger notifications
+ if(!result.unknownNode) {
+ getProviderService().notifyContainerCompleted(containerId);
+ queue(new UnregisterComponentInstance(containerId, 0,
+ TimeUnit.MILLISECONDS));
+ }
+ }
+
+ reviewRequestAndReleaseNodes("onContainersCompleted");
+ }
+
+ /**
+ * Signal that containers are being upgraded. Containers specified with
+ * --containers option and all containers of all roles specified with
+ * --components option are merged and upgraded.
+ *
+ * @param upgradeContainersRequest
+ * request containing upgrade details
+ */
+ public synchronized void onUpgradeContainers(
+ ActionUpgradeContainers upgradeContainersRequest) throws IOException,
+ SliderException {
+ LOG_YARN.info("onUpgradeContainers({})",
+ upgradeContainersRequest.getMessage());
+ Set<String> containers = upgradeContainersRequest.getContainers() == null ? new HashSet<String>()
+ : upgradeContainersRequest.getContainers();
+ LOG_YARN.info(" Container list provided (total {}) : {}",
+ containers.size(), containers);
+ Set<String> components = upgradeContainersRequest.getComponents() == null ? new HashSet<String>()
+ : upgradeContainersRequest.getComponents();
+ LOG_YARN.info(" Component list provided (total {}) : {}",
+ components.size(), components);
+ // If components are specified as well, then grab all the containers of
+ // each of the components (roles)
+ if (CollectionUtils.isNotEmpty(components)) {
+ Map<ContainerId, RoleInstance> liveContainers = appState.getLiveContainers();
+ if (CollectionUtils.isNotEmpty(liveContainers.keySet())) {
+ Map<String, Set<String>> roleContainerMap = prepareRoleContainerMap(liveContainers);
+ for (String component : components) {
+ Set<String> roleContainers = roleContainerMap.get(component);
+ if (roleContainers != null) {
+ containers.addAll(roleContainers);
+ }
+ }
+ }
+ }
+ LOG_YARN.info("Final list of containers to be upgraded (total {}) : {}",
+ containers.size(), containers);
+ if (providerService instanceof AgentProviderService) {
+ AgentProviderService agentProviderService = (AgentProviderService) providerService;
+ agentProviderService.setInUpgradeMode(true);
+ agentProviderService.addUpgradeContainers(containers);
+ }
+ }
+
+ // create a reverse map of roles -> set of all live containers
+ private Map<String, Set<String>> prepareRoleContainerMap(
+ Map<ContainerId, RoleInstance> liveContainers) {
+ // liveContainers is ensured to be not empty
+ Map<String, Set<String>> roleContainerMap = new HashMap<>();
+ for (Map.Entry<ContainerId, RoleInstance> liveContainer : liveContainers
+ .entrySet()) {
+ RoleInstance role = liveContainer.getValue();
+ if (roleContainerMap.containsKey(role.role)) {
+ roleContainerMap.get(role.role).add(liveContainer.getKey().toString());
+ } else {
+ Set<String> containers = new HashSet<String>();
+ containers.add(liveContainer.getKey().toString());
+ roleContainerMap.put(role.role, containers);
+ }
+ }
+ return roleContainerMap;
+ }
+
+ /**
+ * Implementation of cluster flexing.
+ * It should be the only way that anything -even the AM itself on startup-
+ * asks for nodes.
+ * @param resources the resource tree
+ * @throws SliderException slider problems, including invalid configs
+ * @throws IOException IO problems
+ */
+ public void flexCluster(ConfTree resources)
+ throws IOException, SliderException {
+
+ AggregateConf newConf =
+ new AggregateConf(appState.getInstanceDefinitionSnapshot());
+ newConf.setResources(resources);
+ // verify the new definition is valid
+ sliderAMProvider.validateInstanceDefinition(newConf);
+ providerService.validateInstanceDefinition(newConf);
+
+ appState.updateResourceDefinitions(resources);
+
+ // reset the scheduled windows...the values
+ // may have changed
+ appState.resetFailureCounts();
+
+ // ask for more containers if needed
+ reviewRequestAndReleaseNodes("flexCluster");
+ }
+
+ /**
+ * Schedule the failure window
+ * @param resources the resource tree
+ * @throws BadConfigException if the window is out of range
+ */
+ private void scheduleFailureWindowResets(ConfTree resources) throws
+ BadConfigException {
+ ResetFailureWindow reset = new ResetFailureWindow();
+ ConfTreeOperations ops = new ConfTreeOperations(resources);
+ MapOperations globals = ops.getGlobalOptions();
+ long seconds = globals.getTimeRange(ResourceKeys.CONTAINER_FAILURE_WINDOW,
+ ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_DAYS,
+ ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_HOURS,
+ ResourceKeys.DEFAULT_CONTAINER_FAILURE_WINDOW_MINUTES, 0);
+ if (seconds > 0) {
+ log.info(
+ "Scheduling the failure window reset interval to every {} seconds",
+ seconds);
+ RenewingAction<ResetFailureWindow> renew = new RenewingAction<>(
+ reset, seconds, seconds, TimeUnit.SECONDS, 0);
+ actionQueues.renewing("failures", renew);
+ } else {
+ log.info("Failure window reset interval is not set");
+ }
+ }
+
+ /**
+ * Schedule the escalation action
+ * @param internal
+ * @throws BadConfigException
+ */
+ private void scheduleEscalation(ConfTree internal) throws BadConfigException {
+ EscalateOutstandingRequests escalate = new EscalateOutstandingRequests();
+ ConfTreeOperations ops = new ConfTreeOperations(internal);
+ int seconds = ops.getGlobalOptions().getOptionInt(InternalKeys.ESCALATION_CHECK_INTERVAL,
+ InternalKeys.DEFAULT_ESCALATION_CHECK_INTERVAL);
+ RenewingAction<EscalateOutstandingRequests> renew = new RenewingAction<>(
+ escalate, seconds, seconds, TimeUnit.SECONDS, 0);
+ actionQueues.renewing("escalation", renew);
+ }
+
+ /**
+ * Look at where the current node state is -and whether it should be changed
+ * @param reason reason for operation
+ */
+ private synchronized void reviewRequestAndReleaseNodes(String reason) {
+ log.debug("reviewRequestAndReleaseNodes({})", reason);
+ queue(new ReviewAndFlexApplicationSize(reason, 0, TimeUnit.SECONDS));
+ }
+
+ /**
+ * Handle the event requesting a review ... look at the queue and decide
+ * whether to act or not
+ * @param action action triggering the event. It may be put
+ * back into the queue
+ * @throws SliderInternalStateException
+ */
+ public void handleReviewAndFlexApplicationSize(ReviewAndFlexApplicationSize action)
+ throws SliderInternalStateException {
+
+ if ( actionQueues.hasQueuedActionWithAttribute(
+ AsyncAction.ATTR_REVIEWS_APP_SIZE | AsyncAction.ATTR_HALTS_APP)) {
+ // this operation isn't needed at all -existing duplicate or shutdown due
+ return;
+ }
+ // if there is an action which changes cluster size, wait
+ if (actionQueues.hasQueuedActionWithAttribute(
+ AsyncAction.ATTR_CHANGES_APP_SIZE)) {
+ // place the action at the back of the queue
+ actionQueues.put(action);
+ }
+
+ executeNodeReview(action.name);
+ }
+
+ /**
+ * Look at where the current node state is -and whether it should be changed
+ */
+ public synchronized void executeNodeReview(String reason)
+ throws SliderInternalStateException {
+
+ log.debug("in executeNodeReview({})", reason);
+ if (amCompletionFlag.get()) {
+ log.info("Ignoring node review operation: shutdown in progress");
+ }
+ try {
+ List<AbstractRMOperation> allOperations = appState.reviewRequestAndReleaseNodes();
+ // tell the provider
+ providerRMOperationHandler.execute(allOperations);
+ //now apply the operations
+ execute(allOperations);
+ } catch (TriggerClusterTeardownException e) {
+ //App state has decided that it is time to exit
+ log.error("Cluster teardown triggered {}", e, e);
+ queue(new ActionStopSlider(e));
+ }
+ }
+
+ /**
+ * Escalate operation as triggered by external timer.
+ * <p>
+ * Get the list of new operations off the AM, then executest them.
+ */
+ public void escalateOutstandingRequests() {
+ List<AbstractRMOperation> operations = appState.escalateOutstandingRequests();
+ providerRMOperationHandler.execute(operations);
+ execute(operations);
+ }
+
+
+ /**
+ * Shutdown operation: release all containers
+ */
+ private void releaseAllContainers() {
+ if (providerService instanceof AgentProviderService) {
+ log.info("Setting stopInitiated flag to true");
+ AgentProviderService agentProviderService = (AgentProviderService) providerService;
+ agentProviderService.setAppStopInitiated(true);
+ }
+ // Add the sleep here (before releasing containers) so that applications get
+ // time to perform graceful shutdown
+ try {
+ long timeout = getContainerReleaseTimeout();
+ if (timeout > 0) {
+ Thread.sleep(timeout);
+ }
+ } catch (InterruptedException e) {
+ log.info("Sleep for container release interrupted");
+ } finally {
+ List<AbstractRMOperation> operations = appState.releaseAllContainers();
+ providerRMOperationHandler.execute(operations);
+ // now apply the operations
+ execute(operations);
+ }
+ }
+
+ private long getContainerReleaseTimeout() {
+ // Get container release timeout in millis or 0 if the property is not set.
+ // If non-zero then add the agent heartbeat delay time, since it can take up
+ // to that much time for agents to receive the stop command.
+ int timeout = getInstanceDefinition().getAppConfOperations()
+ .getGlobalOptions()
+ .getOptionInt(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0);
+ if (timeout > 0) {
+ timeout += SliderKeys.APP_CONTAINER_HEARTBEAT_INTERVAL_SEC;
+ }
+ // convert to millis
+ long timeoutInMillis = timeout * 1000l;
+ log.info("Container release timeout in millis = {}", timeoutInMillis);
+ return timeoutInMillis;
+ }
+
+ /**
+ * RM wants to shut down the AM
+ */
+ @Override //AMRMClientAsync
+ public void onShutdownRequest() {
+ LOG_YARN.info("Shutdown Request received");
+ signalAMComplete(new ActionStopSlider("stop",
+ EXIT_SUCCESS,
+ FinalApplicationStatus.SUCCEEDED,
+ "Shutdown requested from RM"));
+ }
+
+ /**
+ * Monitored nodes have been changed
+ * @param updatedNodes list of updated nodes
+ */
+ @Override //AMRMClientAsync
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {
+ LOG_YARN.info("onNodesUpdated({})", updatedNodes.size());
+ log.info("Updated nodes {}", updatedNodes);
+ // Check if any nodes are lost or revived and update state accordingly
+
+ AppState.NodeUpdatedOutcome outcome = appState.onNodesUpdated(updatedNodes);
+ if (!outcome.operations.isEmpty()) {
+ execute(outcome.operations);
+ }
+ // trigger a review if the cluster changed
+ if (outcome.clusterChanged) {
+ reviewRequestAndReleaseNodes("nodes updated");
+ }
+ }
+
+ /**
+ * heartbeat operation; return the ratio of requested
+ * to actual
+ * @return progress
+ */
+ @Override //AMRMClientAsync
+ public float getProgress() {
+ return appState.getApplicationProgressPercentage();
+ }
+
+ @Override //AMRMClientAsync
+ public void onError(Throwable e) {
+ //callback says it's time to finish
+ LOG_YARN.error("AMRMClientAsync.onError() received {}", e, e);
+ signalAMComplete(new ActionStopSlider("stop",
+ EXIT_EXCEPTION_THROWN,
+ FinalApplicationStatus.FAILED,
+ "AMRMClientAsync.onError() received " + e));
+ }
+
+/* =================================================================== */
+/* RMOperationHandlerActions */
+/* =================================================================== */
+
+
+ @Override
+ public void execute(List<AbstractRMOperation> operations) {
+ rmOperationHandler.execute(operations);
+ }
+
+ @Override
+ public void releaseAssignedContainer(ContainerId containerId) {
+ rmOperationHandler.releaseAssignedContainer(containerId);
+ }
+
+ @Override
+ public void addContainerRequest(AMRMClient.ContainerRequest req) {
+ rmOperationHandler.addContainerRequest(req);
+ }
+
+ @Override
+ public int cancelContainerRequests(Priority priority1,
+ Priority priority2,
+ int count) {
+ return rmOperationHandler.cancelContainerRequests(priority1, priority2, count);
+ }
+
+ @Override
+ public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
+ rmOperationHandler.cancelSingleRequest(request);
+ }
+
+/* =================================================================== */
+/* END */
+/* =================================================================== */
+
+ /**
+ * Launch the provider service
+ *
+ * @param instanceDefinition definition of the service
+ * @param confDir directory of config data
+ * @throws IOException
+ * @throws SliderException
+ */
+ protected synchronized void launchProviderService(AggregateConf instanceDefinition,
+ File confDir)
+ throws IOException, SliderException {
+ Map<String, String> env = new HashMap<>();
+ boolean execStarted = providerService.exec(instanceDefinition, confDir, env,
+ this);
+ if (execStarted) {
+ providerService.registerServiceListener(this);
+ providerService.start();
+ } else {
+ // didn't start, so don't register
+ providerService.start();
+ // and send the started event ourselves
+ eventCallbackEvent(null);
+ }
+ }
+
+ /* =================================================================== */
+ /* EventCallback from the child or ourselves directly */
+ /* =================================================================== */
+
+ @Override // ProviderCompleted
+ public void eventCallbackEvent(Object parameter) {
+ // signalled that the child process is up.
+ appState.noteAMLive();
+ // now ask for the cluster nodes
+ try {
+ flexCluster(getInstanceDefinition().getResources());
+ } catch (Exception e) {
+ // cluster flex failure: log
+ log.error("Failed to flex cluster nodes: {}", e, e);
+ // then what? exit
+ queue(new ActionStopSlider(e));
+ }
+ }
+
+ /**
+ * report container loss. If this isn't already known about, react
+ *
+ * @param containerId id of the container which has failed
+ * @throws SliderException
+ */
+ public synchronized void providerLostContainer(
+ ContainerId containerId)
+ throws SliderException {
+ log.info("containerLostContactWithProvider: container {} lost",
+ containerId);
+ RoleInstance activeContainer = appState.getOwnedContainer(containerId);
+ if (activeContainer != null) {
+ execute(appState.releaseContainer(containerId));
+ // ask for more containers if needed
+ log.info("Container released; triggering review");
+ reviewRequestAndReleaseNodes("Loss of container");
+ } else {
+ log.info("Container not in active set - ignoring");
+ }
+ }
+
+ /* =================================================================== */
+ /* ServiceStateChangeListener */
+ /* =================================================================== */
+
+ /**
+ * Received on listening service termination.
+ * @param service the service that has changed.
+ */
+ @Override //ServiceStateChangeListener
+ public void stateChanged(Service service) {
+ if (service == providerService && service.isInState(STATE.STOPPED)) {
+ //its the current master process in play
+ int exitCode = providerService.getExitCode();
+ int mappedProcessExitCode = exitCode;
+
+ boolean shouldTriggerFailure = !amCompletionFlag.get()
+ && (mappedProcessExitCode != 0);
+
+ if (shouldTriggerFailure) {
+ String reason =
+ "Spawned process failed with raw " + exitCode + " mapped to " +
+ mappedProcessExitCode;
+ ActionStopSlider stop = new ActionStopSlider("stop",
+ mappedProcessExitCode,
+ FinalApplicationStatus.FAILED,
+ reason);
+ //this wasn't expected: the process finished early
+ spawnedProcessExitedBeforeShutdownTriggered = true;
+ log.info(
+ "Process has exited with exit code {} mapped to {} -triggering termination",
+ exitCode,
+ mappedProcessExitCode);
+
+ //tell the AM the cluster is complete
+ signalAMComplete(stop);
+ } else {
+ //we don't care
+ log.info(
+ "Process has exited with exit code {} mapped to {} -ignoring",
+ exitCode,
+ mappedProcessExitCode);
+ }
+ } else {
+ super.stateChanged(service);
+ }
+ }
+
+ /**
+ * stop forked process if it the running process var is not null
+ * @return the process exit code
+ */
+ protected synchronized Integer stopForkedProcess() {
+ providerService.stop();
+ return providerService.getExitCode();
+ }
+
+ /**
+ * Async start container request
+ * @param container container
+ * @param ctx context
+ * @param instance node details
+ */
+ public void startContainer(Container container,
+ ContainerLaunchContext ctx,
+ RoleInstance instance) throws IOException {
+ appState.containerStartSubmitted(container, instance);
+
+ nmClientAsync.startContainerAsync(container, ctx);
+ }
+
+ /**
+ * Build the credentials needed for containers. This will include
+ * getting new delegation tokens for HDFS if the AM is running
+ * with a keytab.
+ * @return a buffer of credentials
+ * @throws IOException
+ */
+
+ private Credentials buildContainerCredentials() throws IOException {
+ Credentials credentials = new Credentials(containerCredentials);
+ if (securityConfiguration.isKeytabProvided()) {
+ CredentialUtils.addSelfRenewableFSDelegationTokens(
+ getClusterFS().getFileSystem(),
+ credentials);
+ }
+ return credentials;
+ }
+
+ @Override // NMClientAsync.CallbackHandler
+ public void onContainerStopped(ContainerId containerId) {
+ // do nothing but log: container events from the AM
+ // are the source of container halt details to react to
+ log.info("onContainerStopped {} ", containerId);
+ }
+
+ @Override // NMClientAsync.CallbackHandler
+ public void onContainerStarted(ContainerId containerId,
+ Map<String, ByteBuffer> allServiceResponse) {
+ LOG_YARN.info("Started Container {} ", containerId);
+ RoleInstance cinfo = appState.onNodeManagerContainerStarted(containerId);
+ if (cinfo != null) {
+ LOG_YARN.info("Deployed instance of role {} onto {}",
+ cinfo.role, containerId);
+ //trigger an async container status
+ nmClientAsync.getContainerStatusAsync(containerId,
+ cinfo.container.getNodeId());
+ // push out a registration
+ queue(new RegisterComponentInstance(containerId, cinfo.role, cinfo.group,
+ 0, TimeUnit.MILLISECONDS));
+
+ } else {
+ //this is a hypothetical path not seen. We react by warning
+ log.error("Noti
<TRUNCATED>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org