You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by go...@apache.org on 2016/08/25 20:19:56 UTC
[27/46] incubator-slider git commit: SLIDER-1165 Create
yarn-native-services branch on Slider corresponding to the
yarn-native-services branch on Hadoop
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
deleted file mode 100644
index 7594d51..0000000
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ /dev/null
@@ -1,3320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.providers.agent;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
-import org.apache.hadoop.registry.client.types.Endpoint;
-import org.apache.hadoop.registry.client.types.ProtocolTypes;
-import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
-import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.slider.api.ClusterDescription;
-import org.apache.slider.api.ClusterNode;
-import org.apache.slider.api.InternalKeys;
-import org.apache.slider.api.OptionKeys;
-import org.apache.slider.api.ResourceKeys;
-import org.apache.slider.api.StatusKeys;
-import org.apache.slider.common.SliderExitCodes;
-import org.apache.slider.common.SliderKeys;
-import org.apache.slider.common.SliderXmlConfKeys;
-import org.apache.slider.common.tools.SliderFileSystem;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.conf.AggregateConf;
-import org.apache.slider.core.conf.ConfTreeOperations;
-import org.apache.slider.core.conf.MapOperations;
-import org.apache.slider.core.exceptions.BadCommandArgumentsException;
-import org.apache.slider.core.exceptions.BadConfigException;
-import org.apache.slider.core.exceptions.NoSuchNodeException;
-import org.apache.slider.core.exceptions.SliderException;
-import org.apache.slider.core.launch.CommandLineBuilder;
-import org.apache.slider.core.launch.ContainerLauncher;
-import org.apache.slider.core.registry.docstore.ConfigFormat;
-import org.apache.slider.core.registry.docstore.ConfigUtils;
-import org.apache.slider.core.registry.docstore.ExportEntry;
-import org.apache.slider.core.registry.docstore.PublishedConfiguration;
-import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
-import org.apache.slider.core.registry.docstore.PublishedExports;
-import org.apache.slider.core.registry.info.CustomRegistryConstants;
-import org.apache.slider.providers.AbstractProviderService;
-import org.apache.slider.providers.MonitorDetail;
-import org.apache.slider.providers.ProviderCore;
-import org.apache.slider.providers.ProviderRole;
-import org.apache.slider.providers.ProviderUtils;
-import org.apache.slider.providers.agent.application.metadata.AbstractComponent;
-import org.apache.slider.providers.agent.application.metadata.Application;
-import org.apache.slider.providers.agent.application.metadata.CommandOrder;
-import org.apache.slider.providers.agent.application.metadata.CommandScript;
-import org.apache.slider.providers.agent.application.metadata.Component;
-import org.apache.slider.providers.agent.application.metadata.ComponentCommand;
-import org.apache.slider.providers.agent.application.metadata.ComponentExport;
-import org.apache.slider.providers.agent.application.metadata.ComponentsInAddonPackage;
-import org.apache.slider.providers.agent.application.metadata.ConfigFile;
-import org.apache.slider.providers.agent.application.metadata.DefaultConfig;
-import org.apache.slider.providers.agent.application.metadata.DockerContainer;
-import org.apache.slider.providers.agent.application.metadata.Export;
-import org.apache.slider.providers.agent.application.metadata.ExportGroup;
-import org.apache.slider.providers.agent.application.metadata.Metainfo;
-import org.apache.slider.providers.agent.application.metadata.OSPackage;
-import org.apache.slider.providers.agent.application.metadata.OSSpecific;
-import org.apache.slider.providers.agent.application.metadata.Package;
-import org.apache.slider.providers.agent.application.metadata.PropertyInfo;
-import org.apache.slider.server.appmaster.actions.ProviderReportedContainerLoss;
-import org.apache.slider.server.appmaster.actions.RegisterComponentInstance;
-import org.apache.slider.server.appmaster.state.ContainerPriority;
-import org.apache.slider.server.appmaster.state.RoleInstance;
-import org.apache.slider.server.appmaster.state.StateAccessForProviders;
-import org.apache.slider.server.appmaster.web.rest.agent.AgentCommandType;
-import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
-import org.apache.slider.server.appmaster.web.rest.agent.CommandReport;
-import org.apache.slider.server.appmaster.web.rest.agent.ComponentStatus;
-import org.apache.slider.server.appmaster.web.rest.agent.ExecutionCommand;
-import org.apache.slider.server.appmaster.web.rest.agent.HeartBeat;
-import org.apache.slider.server.appmaster.web.rest.agent.HeartBeatResponse;
-import org.apache.slider.server.appmaster.web.rest.agent.Register;
-import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse;
-import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus;
-import org.apache.slider.server.appmaster.web.rest.agent.StatusCommand;
-import org.apache.slider.server.services.security.CertificateManager;
-import org.apache.slider.server.services.security.SecurityStore;
-import org.apache.slider.server.services.security.StoresGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Scanner;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Pattern;
-
-import static org.apache.slider.api.RoleKeys.ROLE_PREFIX;
-import static org.apache.slider.server.appmaster.web.rest.RestPaths.SLIDER_PATH_AGENTS;
-
-/**
- * This class implements the server-side logic for application deployment through Slider application package
- */
-public class AgentProviderService extends AbstractProviderService implements
- ProviderCore,
- AgentKeys,
- SliderKeys, AgentRestOperations {
-
-
- protected static final Logger log =
- LoggerFactory.getLogger(AgentProviderService.class);
- private static final ProviderUtils providerUtils = new ProviderUtils(log);
- private static final String LABEL_MAKER = "___";
- private static final String CONTAINER_ID = "container_id";
- private static final String GLOBAL_CONFIG_TAG = "global";
- private static final String LOG_FOLDERS_TAG = "LogFolders";
- private static final String HOST_FOLDER_FORMAT = "%s:%s";
- private static final String CONTAINER_LOGS_TAG = "container_log_dirs";
- private static final String CONTAINER_PWDS_TAG = "container_work_dirs";
- private static final String COMPONENT_TAG = "component";
- private static final String APPLICATION_TAG = "application";
- private static final String COMPONENT_DATA_TAG = "ComponentInstanceData";
- private static final String SHARED_PORT_TAG = "SHARED";
- private static final String PER_CONTAINER_TAG = "{PER_CONTAINER}";
- private static final int MAX_LOG_ENTRIES = 40;
- private static final int DEFAULT_HEARTBEAT_MONITOR_INTERVAL = 60 * 1000;
-
- private final Object syncLock = new Object();
- private final ComponentTagProvider tags = new ComponentTagProvider();
- private int heartbeatMonitorInterval = 0;
- private AgentClientProvider clientProvider;
- private AtomicInteger taskId = new AtomicInteger(0);
- private volatile Map<String, MetainfoHolder> metaInfoMap = new HashMap<>();
- private SliderFileSystem fileSystem = null;
- private Map<String, DefaultConfig> defaultConfigs = null;
- private ComponentCommandOrder commandOrder = new ComponentCommandOrder();
- private HeartbeatMonitor monitor;
- private Boolean canAnyMasterPublish = null;
- private AgentLaunchParameter agentLaunchParameter = null;
- private String clusterName = null;
- private boolean isInUpgradeMode;
- private Set<String> upgradeContainers = new HashSet<String>();
- private boolean appStopInitiated;
-
- private final Map<String, ComponentInstanceState> componentStatuses =
- new ConcurrentHashMap<String, ComponentInstanceState>();
- private final Map<String, Map<String, String>> componentInstanceData =
- new ConcurrentHashMap<String, Map<String, String>>();
- private final Map<String, Map<String, List<ExportEntry>>> exportGroups =
- new ConcurrentHashMap<String, Map<String, List<ExportEntry>>>();
- private final Map<String, Map<String, String>> allocatedPorts =
- new ConcurrentHashMap<String, Map<String, String>>();
- private final Map<String, Metainfo> packageMetainfo =
- new ConcurrentHashMap<String, Metainfo>();
-
- private final Map<String, ExportEntry> logFolderExports =
- Collections.synchronizedMap(new LinkedHashMap<String, ExportEntry>(MAX_LOG_ENTRIES, 0.75f, false) {
- protected boolean removeEldestEntry(Map.Entry eldest) {
- return size() > MAX_LOG_ENTRIES;
- }
- });
- private final Map<String, ExportEntry> workFolderExports =
- Collections.synchronizedMap(new LinkedHashMap<String, ExportEntry>(MAX_LOG_ENTRIES, 0.75f, false) {
- protected boolean removeEldestEntry(Map.Entry eldest) {
- return size() > MAX_LOG_ENTRIES;
- }
- });
- private final Map<String, Set<String>> containerExportsMap =
- new HashMap<String, Set<String>>();
-
- private static class MetainfoHolder {
- Metainfo metaInfo;
- private Map<String, DefaultConfig> defaultConfigs = null;
-
- public MetainfoHolder(Metainfo metaInfo,
- Map<String, DefaultConfig> defaultConfigs) {
- this.metaInfo = metaInfo;
- this.defaultConfigs = defaultConfigs;
- }
- }
-
- /**
- * Create an instance of AgentProviderService
- */
- public AgentProviderService() {
- super("AgentProviderService");
- setAgentRestOperations(this);
- setHeartbeatMonitorInterval(DEFAULT_HEARTBEAT_MONITOR_INTERVAL);
- }
-
- @Override
- public String getHumanName() {
- return "Slider Agent";
- }
-
- @Override
- public List<ProviderRole> getRoles() {
- return AgentRoles.getRoles();
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- clientProvider = new AgentClientProvider(conf);
- }
-
- @Override
- public Configuration loadProviderConfigurationInformation(File confDir) throws
- BadCommandArgumentsException,
- IOException {
- return new Configuration(false);
- }
-
- @Override
- public void validateInstanceDefinition(AggregateConf instanceDefinition)
- throws
- SliderException {
- clientProvider.validateInstanceDefinition(instanceDefinition, null);
-
- ConfTreeOperations resources =
- instanceDefinition.getResourceOperations();
-
- Set<String> names = resources.getComponentNames();
- names.remove(SliderKeys.COMPONENT_AM);
- for (String name : names) {
- Component componentDef = getApplicationComponent(name);
- if (componentDef == null) {
- // component member is validated elsewhere, so we don't need to throw
- // an exception here
- continue;
- }
-
- MapOperations componentConfig = resources.getMandatoryComponent(name);
- int count =
- componentConfig.getMandatoryOptionInt(ResourceKeys.COMPONENT_INSTANCES);
- int definedMinCount = componentDef.getMinInstanceCountInt();
- int definedMaxCount = componentDef.getMaxInstanceCountInt();
- if (count < definedMinCount || count > definedMaxCount) {
- throw new BadConfigException("Component %s, %s value %d out of range. "
- + "Expected minimum is %d and maximum is %d",
- name,
- ResourceKeys.COMPONENT_INSTANCES,
- count,
- definedMinCount,
- definedMaxCount);
- }
- }
- }
-
- // Reads the metainfo.xml in the application package and loads it
- private void buildMetainfo(AggregateConf instanceDefinition,
- SliderFileSystem fileSystem,
- String roleGroup)
- throws IOException, SliderException {
- String mapKey = instanceDefinition.getAppConfOperations()
- .getComponentOpt(roleGroup, ROLE_PREFIX, DEFAULT_METAINFO_MAP_KEY);
- String appDef = SliderUtils.getApplicationDefinitionPath(
- instanceDefinition.getAppConfOperations(), roleGroup);
- MapOperations component = null;
- if (roleGroup != null) {
- component = instanceDefinition.getAppConfOperations().getComponent(roleGroup);
- }
-
- MetainfoHolder metaInfoHolder = metaInfoMap.get(mapKey);
- if (metaInfoHolder == null) {
- synchronized (syncLock) {
- if (this.fileSystem == null) {
- this.fileSystem = fileSystem;
- }
- metaInfoHolder = metaInfoMap.get(mapKey);
- if (metaInfoHolder == null) {
- readAndSetHeartbeatMonitoringInterval(instanceDefinition);
- initializeAgentDebugCommands(instanceDefinition);
-
- Metainfo metaInfo = getApplicationMetainfo(fileSystem, appDef, false);
- log.info("Master package metainfo: {}", metaInfo.toString());
- if (metaInfo == null || metaInfo.getApplication() == null) {
- log.error("metainfo.xml is unavailable or malformed at {}.", appDef);
- throw new SliderException(
- "metainfo.xml is required in app package.");
- }
- List<CommandOrder> commandOrders = metaInfo.getApplication()
- .getCommandOrders();
- if (!DEFAULT_METAINFO_MAP_KEY.equals(mapKey)) {
- for (Component comp : metaInfo.getApplication().getComponents()) {
- comp.setName(mapKey + comp.getName());
- log.info("Modifying external metainfo component name to {}",
- comp.getName());
- }
- for (CommandOrder co : commandOrders) {
- log.info("Adding prefix {} to command order {}",
- mapKey, co);
- co.setCommand(mapKey + co.getCommand());
- co.setRequires(mapKey + co.getRequires());
- }
- }
- log.debug("Merging command orders {} for {}", commandOrders,
- roleGroup);
- commandOrder.mergeCommandOrders(commandOrders,
- instanceDefinition.getResourceOperations());
- Map<String, DefaultConfig> defaultConfigs =
- initializeDefaultConfigs(fileSystem, appDef, metaInfo);
- metaInfoMap.put(mapKey, new MetainfoHolder(metaInfo, defaultConfigs));
- monitor = new HeartbeatMonitor(this, getHeartbeatMonitorInterval());
- monitor.start();
-
- // build a map from component to metainfo
- String addonAppDefString = instanceDefinition.getAppConfOperations()
- .getGlobalOptions().getOption(AgentKeys.ADDONS, null);
- if (component != null) {
- addonAppDefString = component.getOption(AgentKeys.ADDONS, addonAppDefString);
- }
- log.debug("All addon appdefs: {}", addonAppDefString);
- if (addonAppDefString != null) {
- Scanner scanner = new Scanner(addonAppDefString).useDelimiter(",");
- while (scanner.hasNext()) {
- String addonAppDef = scanner.next();
- String addonAppDefPath = instanceDefinition
- .getAppConfOperations().getGlobalOptions().get(addonAppDef);
- if (component != null) {
- addonAppDefPath = component.getOption(addonAppDef, addonAppDefPath);
- }
- log.debug("Addon package {} is stored at: {}", addonAppDef
- + addonAppDefPath);
- Metainfo addonMetaInfo = getApplicationMetainfo(fileSystem,
- addonAppDefPath, true);
- addonMetaInfo.validate();
- packageMetainfo.put(addonMetaInfo.getApplicationPackage()
- .getName(), addonMetaInfo);
- }
- log.info("Metainfo map for master and addon: {}",
- packageMetainfo.toString());
- }
- }
- }
- }
- }
-
- @Override
- public void initializeApplicationConfiguration(
- AggregateConf instanceDefinition, SliderFileSystem fileSystem,
- String roleGroup)
- throws IOException, SliderException {
- buildMetainfo(instanceDefinition, fileSystem, roleGroup);
- }
-
- @Override
- public void buildContainerLaunchContext(ContainerLauncher launcher,
- AggregateConf instanceDefinition,
- Container container,
- ProviderRole providerRole,
- SliderFileSystem fileSystem,
- Path generatedConfPath,
- MapOperations resourceComponent,
- MapOperations appComponent,
- Path containerTmpDirPath) throws
- IOException,
- SliderException {
-
- String roleName = providerRole.name;
- String roleGroup = providerRole.group;
- String appDef = SliderUtils.getApplicationDefinitionPath(instanceDefinition
- .getAppConfOperations(), roleGroup);
-
- initializeApplicationConfiguration(instanceDefinition, fileSystem, roleGroup);
-
- log.info("Build launch context for Agent");
- log.debug(instanceDefinition.toString());
-
- //if we are launching docker based app on yarn, then we need to pass docker image
- if (isYarnDockerContainer(roleGroup)) {
- launcher.setYarnDockerMode(true);
- launcher.setDockerImage(getConfigFromMetaInfo(roleGroup, "image"));
- launcher.setRunPrivilegedContainer(getConfigFromMetaInfo(roleGroup, "runPriviledgedContainer"));
- launcher
- .setYarnContainerMountPoints(getConfigFromMetaInfoWithAppConfigOverriding(
- roleGroup, "yarn.container.mount.points"));
- }
-
- // Set the environment
- launcher.putEnv(SliderUtils.buildEnvMap(appComponent,
- getStandardTokenMap(getAmState().getAppConfSnapshot(), roleName, roleGroup)));
-
- String workDir = ApplicationConstants.Environment.PWD.$();
- launcher.setEnv("AGENT_WORK_ROOT", workDir);
- log.info("AGENT_WORK_ROOT set to {}", workDir);
- String logDir = ApplicationConstants.LOG_DIR_EXPANSION_VAR;
- launcher.setEnv("AGENT_LOG_ROOT", logDir);
- log.info("AGENT_LOG_ROOT set to {}", logDir);
- if (System.getenv(HADOOP_USER_NAME) != null) {
- launcher.setEnv(HADOOP_USER_NAME, System.getenv(HADOOP_USER_NAME));
- }
- // for 2-Way SSL
- launcher.setEnv(SLIDER_PASSPHRASE, instanceDefinition.getPassphrase());
- //add english env
- launcher.setEnv("LANG", "en_US.UTF-8");
- launcher.setEnv("LC_ALL", "en_US.UTF-8");
- launcher.setEnv("LANGUAGE", "en_US.UTF-8");
-
- //local resources
-
- // TODO: Should agent need to support App Home
- String scriptPath = new File(AgentKeys.AGENT_MAIN_SCRIPT_ROOT, AgentKeys.AGENT_MAIN_SCRIPT).getPath();
- String appHome = instanceDefinition.getAppConfOperations().
- getGlobalOptions().get(AgentKeys.PACKAGE_PATH);
- if (SliderUtils.isSet(appHome)) {
- scriptPath = new File(appHome, AgentKeys.AGENT_MAIN_SCRIPT).getPath();
- }
-
- // set PYTHONPATH
- List<String> pythonPaths = new ArrayList<String>();
- pythonPaths.add(AgentKeys.AGENT_MAIN_SCRIPT_ROOT);
- pythonPaths.add(AgentKeys.AGENT_JINJA2_ROOT);
- String pythonPath = StringUtils.join(File.pathSeparator, pythonPaths);
- launcher.setEnv(PYTHONPATH, pythonPath);
- log.info("PYTHONPATH set to {}", pythonPath);
-
- Path agentImagePath = null;
- String agentImage = instanceDefinition.getInternalOperations().
- get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
- if (SliderUtils.isUnset(agentImage)) {
- agentImagePath =
- new Path(new Path(new Path(instanceDefinition.getInternalOperations().get(InternalKeys.INTERNAL_TMP_DIR),
- container.getId().getApplicationAttemptId().getApplicationId().toString()),
- AgentKeys.PROVIDER_AGENT),
- SliderKeys.AGENT_TAR);
- } else {
- agentImagePath = new Path(agentImage);
- }
-
- if (fileSystem.getFileSystem().exists(agentImagePath)) {
- LocalResource agentImageRes = fileSystem.createAmResource(agentImagePath, LocalResourceType.ARCHIVE);
- launcher.addLocalResource(AgentKeys.AGENT_INSTALL_DIR, agentImageRes);
- } else {
- String msg =
- String.format("Required agent image slider-agent.tar.gz is unavailable at %s", agentImagePath.toString());
- MapOperations compOps = appComponent;
- boolean relaxVerificationForTest = compOps != null ? Boolean.valueOf(compOps.
- getOptionBool(AgentKeys.TEST_RELAX_VERIFICATION, false)) : false;
- log.error(msg);
-
- if (!relaxVerificationForTest) {
- throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, msg);
- }
- }
-
- log.info("Using {} for agent.", scriptPath);
- LocalResource appDefRes = fileSystem.createAmResource(
- fileSystem.getFileSystem().resolvePath(new Path(appDef)),
- LocalResourceType.ARCHIVE);
- launcher.addLocalResource(AgentKeys.APP_DEFINITION_DIR, appDefRes);
-
- for (Package pkg : getMetaInfo(roleGroup).getApplication().getPackages()) {
- Path pkgPath = fileSystem.buildResourcePath(pkg.getName());
- if (!fileSystem.isFile(pkgPath)) {
- pkgPath = fileSystem.buildResourcePath(getClusterName(),
- pkg.getName());
- }
- if (!fileSystem.isFile(pkgPath)) {
- throw new IOException("Package doesn't exist as a resource: " +
- pkg.getName());
- }
- log.info("Adding resource {}", pkg.getName());
- LocalResourceType type = LocalResourceType.FILE;
- if ("archive".equals(pkg.getType())) {
- type = LocalResourceType.ARCHIVE;
- }
- LocalResource packageResource = fileSystem.createAmResource(
- pkgPath, type);
- launcher.addLocalResource(AgentKeys.APP_PACKAGES_DIR, packageResource);
- }
-
- String agentConf = instanceDefinition.getAppConfOperations().
- getGlobalOptions().getOption(AgentKeys.AGENT_CONF, "");
- if (SliderUtils.isSet(agentConf)) {
- LocalResource agentConfRes = fileSystem.createAmResource(fileSystem
- .getFileSystem().resolvePath(new Path(agentConf)),
- LocalResourceType.FILE);
- launcher.addLocalResource(AgentKeys.AGENT_CONFIG_FILE, agentConfRes);
- }
-
- String agentVer = instanceDefinition.getAppConfOperations().
- getGlobalOptions().getOption(AgentKeys.AGENT_VERSION, null);
- if (agentVer != null) {
- LocalResource agentVerRes = fileSystem.createAmResource(
- fileSystem.getFileSystem().resolvePath(new Path(agentVer)),
- LocalResourceType.FILE);
- launcher.addLocalResource(AgentKeys.AGENT_VERSION_FILE, agentVerRes);
- }
-
- if (SliderUtils.isHadoopClusterSecure(getConfig())) {
- localizeServiceKeytabs(launcher, instanceDefinition, fileSystem);
- }
-
- MapOperations amComponent = instanceDefinition.
- getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM);
- boolean twoWayEnabled = amComponent != null ? Boolean.valueOf(amComponent.
- getOptionBool(AgentKeys.KEY_AGENT_TWO_WAY_SSL_ENABLED, false)) : false;
- if (twoWayEnabled) {
- localizeContainerSSLResources(launcher, container, fileSystem);
- }
-
- MapOperations compOps = appComponent;
- if (areStoresRequested(compOps)) {
- localizeContainerSecurityStores(launcher, container, roleName, fileSystem,
- instanceDefinition, compOps);
- }
-
- //add the configuration resources
- launcher.addLocalResources(fileSystem.submitDirectory(
- generatedConfPath,
- SliderKeys.PROPAGATED_CONF_DIR_NAME));
-
- if (appComponent.getOptionBool(AgentKeys.AM_CONFIG_GENERATION, false)) {
- // build and localize configuration files
- Map<String, Map<String, String>> configurations =
- buildCommandConfigurations(instanceDefinition.getAppConfOperations(),
- container.getId().toString(), roleName, roleGroup);
- localizeConfigFiles(launcher, roleName, roleGroup, getMetaInfo(roleGroup),
- configurations, launcher.getEnv(), fileSystem);
- }
-
- String label = getContainerLabel(container, roleName, roleGroup);
- CommandLineBuilder operation = new CommandLineBuilder();
-
- String pythonExec = instanceDefinition.getAppConfOperations()
- .getGlobalOptions().getOption(SliderXmlConfKeys.PYTHON_EXECUTABLE_PATH,
- AgentKeys.PYTHON_EXE);
-
- operation.add(pythonExec);
-
- operation.add(scriptPath);
- operation.add(ARG_LABEL, label);
- operation.add(ARG_ZOOKEEPER_QUORUM);
- operation.add(getClusterOptionPropertyValue(OptionKeys.ZOOKEEPER_QUORUM));
- operation.add(ARG_ZOOKEEPER_REGISTRY_PATH);
- operation.add(getZkRegistryPath());
-
- String debugCmd = agentLaunchParameter.getNextLaunchParameter(roleGroup);
- if (SliderUtils.isSet(debugCmd)) {
- operation.add(ARG_DEBUG);
- operation.add(debugCmd);
- }
-
- operation.add("> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/"
- + AgentKeys.AGENT_OUT_FILE + " 2>&1");
-
- launcher.addCommand(operation.build());
-
- // localize addon package
- String addonAppDefString = instanceDefinition.getAppConfOperations()
- .getGlobalOptions().getOption(AgentKeys.ADDONS, null);
- log.debug("All addon appdefs: {}", addonAppDefString);
- if (addonAppDefString != null) {
- Scanner scanner = new Scanner(addonAppDefString).useDelimiter(",");
- while (scanner.hasNext()) {
- String addonAppDef = scanner.next();
- String addonAppDefPath = instanceDefinition
- .getAppConfOperations().getGlobalOptions().get(addonAppDef);
- log.debug("Addon package {} is stored at: {}", addonAppDef, addonAppDefPath);
- LocalResource addonPkgRes = fileSystem.createAmResource(
- fileSystem.getFileSystem().resolvePath(new Path(addonAppDefPath)),
- LocalResourceType.ARCHIVE);
- launcher.addLocalResource(AgentKeys.ADDON_DEFINITION_DIR + "/" + addonAppDef, addonPkgRes);
- }
- log.debug("Metainfo map for master and addon: {}",
- packageMetainfo.toString());
- }
-
- // Additional files to localize in addition to the application def
- String appResourcesString = instanceDefinition.getAppConfOperations()
- .getGlobalOptions().getOption(AgentKeys.APP_RESOURCES, null);
- log.info("Configuration value for extra resources to localize: {}", appResourcesString);
- if (null != appResourcesString) {
- try (Scanner scanner = new Scanner(appResourcesString).useDelimiter(",")) {
- while (scanner.hasNext()) {
- String resource = scanner.next();
- Path resourcePath = new Path(resource);
- LocalResource extraResource = fileSystem.createAmResource(
- fileSystem.getFileSystem().resolvePath(resourcePath),
- LocalResourceType.FILE);
- String destination = AgentKeys.APP_RESOURCES_DIR + "/" + resourcePath.getName();
- log.info("Localizing {} to {}", resourcePath, destination);
- // TODO Can we try harder to avoid collisions?
- launcher.addLocalResource(destination, extraResource);
- }
- }
- }
-
- // initialize addon pkg states for all componentInstanceStatus
- Map<String, State> pkgStatuses = new TreeMap<>();
- for (Metainfo appPkg : packageMetainfo.values()) {
- // check each component of that addon to see if they apply to this
- // component 'role'
- for (ComponentsInAddonPackage comp : appPkg.getApplicationPackage()
- .getComponents()) {
- log.debug("Current component: {} component in metainfo: {}", roleName,
- comp.getName());
- if (comp.getName().equals(roleGroup)
- || comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) {
- pkgStatuses.put(appPkg.getApplicationPackage().getName(), State.INIT);
- }
- }
- }
- log.debug("For component: {} pkg status map: {}", roleName,
- pkgStatuses.toString());
-
- // initialize the component instance state
- getComponentStatuses().put(label,
- new ComponentInstanceState(
- roleGroup,
- container.getId(),
- getClusterInfoPropertyValue(OptionKeys.APPLICATION_NAME),
- pkgStatuses));
- }
-
- private void localizeContainerSecurityStores(ContainerLauncher launcher,
- Container container,
- String role,
- SliderFileSystem fileSystem,
- AggregateConf instanceDefinition,
- MapOperations compOps)
- throws SliderException, IOException {
- // substitute CLUSTER_NAME into credentials
- Map<String,List<String>> newcred = new HashMap<>();
- for (Entry<String,List<String>> entry : instanceDefinition.getAppConf().credentials.entrySet()) {
- List<String> resultList = new ArrayList<>();
- for (String v : entry.getValue()) {
- resultList.add(v.replaceAll(Pattern.quote("${CLUSTER_NAME}"),
- clusterName).replaceAll(Pattern.quote("${CLUSTER}"),
- clusterName));
- }
- newcred.put(entry.getKey().replaceAll(Pattern.quote("${CLUSTER_NAME}"),
- clusterName).replaceAll(Pattern.quote("${CLUSTER}"),
- clusterName),
- resultList);
- }
- instanceDefinition.getAppConf().credentials = newcred;
-
- // generate and localize security stores
- SecurityStore[] stores = generateSecurityStores(container, role,
- instanceDefinition, compOps);
- for (SecurityStore store : stores) {
- LocalResource keystoreResource = fileSystem.createAmResource(
- uploadSecurityResource(store.getFile(), fileSystem), LocalResourceType.FILE);
- launcher.addLocalResource(String.format("secstores/%s-%s.p12",
- store.getType(), role),
- keystoreResource);
- }
- }
-
- private SecurityStore[] generateSecurityStores(Container container,
- String role,
- AggregateConf instanceDefinition,
- MapOperations compOps)
- throws SliderException, IOException {
- return StoresGenerator.generateSecurityStores(container.getNodeId().getHost(),
- container.getId().toString(), role,
- instanceDefinition, compOps);
- }
-
- private boolean areStoresRequested(MapOperations compOps) {
- return compOps != null ? compOps.
- getOptionBool(SliderKeys.COMP_STORES_REQUIRED_KEY, false) : false;
- }
-
- private void localizeContainerSSLResources(ContainerLauncher launcher,
- Container container,
- SliderFileSystem fileSystem)
- throws SliderException {
- try {
- // localize server cert
- Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName());
- LocalResource certResource = fileSystem.createAmResource(
- new Path(certsDir, SliderKeys.CRT_FILE_NAME),
- LocalResourceType.FILE);
- launcher.addLocalResource(AgentKeys.CERT_FILE_LOCALIZATION_PATH,
- certResource);
-
- // generate and localize agent cert
- CertificateManager certMgr = new CertificateManager();
- String hostname = container.getNodeId().getHost();
- String containerId = container.getId().toString();
- certMgr.generateContainerCertificate(hostname, containerId);
- LocalResource agentCertResource = fileSystem.createAmResource(
- uploadSecurityResource(
- CertificateManager.getAgentCertficateFilePath(containerId),
- fileSystem), LocalResourceType.FILE);
- // still using hostname as file name on the agent side, but the files
- // do end up under the specific container's file space
- launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname +
- ".crt", agentCertResource);
- LocalResource agentKeyResource = fileSystem.createAmResource(
- uploadSecurityResource(
- CertificateManager.getAgentKeyFilePath(containerId), fileSystem),
- LocalResourceType.FILE);
- launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname +
- ".key", agentKeyResource);
-
- } catch (Exception e) {
- throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, e,
- "Unable to localize certificates. Two-way SSL cannot be enabled");
- }
- }
-
- private Path uploadSecurityResource(File resource, SliderFileSystem fileSystem)
- throws IOException {
- Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName());
- return uploadResource(resource, fileSystem, certsDir);
- }
-
- private Path uploadResource(File resource, SliderFileSystem fileSystem,
- String roleName) throws IOException {
- Path dir;
- if (roleName == null) {
- dir = fileSystem.buildClusterResourcePath(getClusterName());
- } else {
- dir = fileSystem.buildClusterResourcePath(getClusterName(), roleName);
- }
- return uploadResource(resource, fileSystem, dir);
- }
-
- private static synchronized Path uploadResource(File resource,
- SliderFileSystem fileSystem, Path parentDir) throws IOException {
- if (!fileSystem.getFileSystem().exists(parentDir)) {
- fileSystem.getFileSystem().mkdirs(parentDir,
- new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
- }
- Path destPath = new Path(parentDir, resource.getName());
- if (!fileSystem.getFileSystem().exists(destPath)) {
- FSDataOutputStream os = null;
- try {
- os = fileSystem.getFileSystem().create(destPath);
- byte[] contents = FileUtils.readFileToByteArray(resource);
- os.write(contents, 0, contents.length);
- os.flush();
- } finally {
- IOUtils.closeStream(os);
- }
- log.info("Uploaded {} to localization path {}", resource, destPath);
- } else {
- log.info("Resource {} already existed at localization path {}", resource,
- destPath);
- }
-
- while (!fileSystem.getFileSystem().exists(destPath)) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- // ignore
- }
- }
-
- fileSystem.getFileSystem().setPermission(destPath,
- new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE));
-
- return destPath;
- }
-
- private void localizeServiceKeytabs(ContainerLauncher launcher,
- AggregateConf instanceDefinition,
- SliderFileSystem fileSystem)
- throws IOException {
- String keytabPathOnHost = instanceDefinition.getAppConfOperations()
- .getComponent(SliderKeys.COMPONENT_AM).get(
- SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
- if (SliderUtils.isUnset(keytabPathOnHost)) {
- String amKeytabName = instanceDefinition.getAppConfOperations()
- .getComponent(SliderKeys.COMPONENT_AM).get(
- SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
- String keytabDir = instanceDefinition.getAppConfOperations()
- .getComponent(SliderKeys.COMPONENT_AM).get(
- SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR);
- // we need to localize the keytab files in the directory
- Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null,
- getClusterName());
- boolean serviceKeytabsDeployed = false;
- if (fileSystem.getFileSystem().exists(keytabDirPath)) {
- FileStatus[] keytabs = fileSystem.getFileSystem().listStatus(keytabDirPath);
- LocalResource keytabRes;
- for (FileStatus keytab : keytabs) {
- if (!amKeytabName.equals(keytab.getPath().getName())
- && keytab.getPath().getName().endsWith(".keytab")) {
- serviceKeytabsDeployed = true;
- log.info("Localizing keytab {}", keytab.getPath().getName());
- keytabRes = fileSystem.createAmResource(keytab.getPath(),
- LocalResourceType.FILE);
- launcher.addLocalResource(SliderKeys.KEYTAB_DIR + "/" +
- keytab.getPath().getName(),
- keytabRes);
- }
- }
- }
- if (!serviceKeytabsDeployed) {
- log.warn("No service keytabs for the application have been localized. "
- + "If the application requires keytabs for secure operation, "
- + "please ensure that the required keytabs have been uploaded "
- + "to the folder {}", keytabDirPath);
- }
- }
- }
-
- private void createConfigFile(SliderFileSystem fileSystem, File file,
- ConfigFile configFile, Map<String, String> config)
- throws IOException {
- ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType());
- log.info("Writing {} file {}", configFormat, file);
-
- ConfigUtils.prepConfigForTemplateOutputter(configFormat, config,
- fileSystem, getClusterName(), file.getName());
- PublishedConfiguration publishedConfiguration =
- new PublishedConfiguration(configFile.getDictionaryName(),
- config.entrySet());
- PublishedConfigurationOutputter configurationOutputter =
- PublishedConfigurationOutputter.createOutputter(configFormat,
- publishedConfiguration);
- configurationOutputter.save(file);
- }
-
- @VisibleForTesting
- protected void localizeConfigFiles(ContainerLauncher launcher,
- String roleName, String roleGroup,
- Metainfo metainfo,
- Map<String, Map<String, String>> configs,
- MapOperations env,
- SliderFileSystem fileSystem)
- throws IOException {
- for (ConfigFile configFile : metainfo.getComponentConfigFiles(roleGroup)) {
- Map<String, String> config = ConfigUtils.replacePropsInConfig(
- configs.get(configFile.getDictionaryName()), env.options);
- String fileName = ConfigUtils.replaceProps(config,
- configFile.getFileName());
- File localFile = new File(SliderKeys.RESOURCE_DIR);
- if (!localFile.exists()) {
- localFile.mkdir();
- }
- localFile = new File(localFile, new File(fileName).getName());
-
- String folder = null;
- if ("true".equals(config.get(PER_COMPONENT))) {
- folder = roleName;
- } else if ("true".equals(config.get(PER_GROUP))) {
- folder = roleGroup;
- }
-
- log.info("Localizing {} configs to config file {} (destination {}) " +
- "based on {} configs", config.size(), localFile, fileName,
- configFile.getDictionaryName());
- createConfigFile(fileSystem, localFile, configFile, config);
- Path destPath = uploadResource(localFile, fileSystem, folder);
- LocalResource configResource = fileSystem.createAmResource(destPath,
- LocalResourceType.FILE);
-
- File destFile = new File(fileName);
- if (destFile.isAbsolute()) {
- launcher.addLocalResource(
- SliderKeys.RESOURCE_DIR + "/" + destFile.getName(),
- configResource, fileName);
- } else {
- launcher.addLocalResource(AgentKeys.APP_CONF_DIR + "/" + fileName,
- configResource);
- }
- }
- }
-
- /**
- * build the zookeeper registry path.
- *
- * @return the path the service registered at
- * @throws NullPointerException if the service has not yet registered
- */
- private String getZkRegistryPath() {
- Preconditions.checkNotNull(yarnRegistry, "Yarn registry not bound");
- String path = yarnRegistry.getAbsoluteSelfRegistrationPath();
- Preconditions.checkNotNull(path, "Service record path not defined");
- return path;
- }
-
- @Override
- public void rebuildContainerDetails(List<Container> liveContainers,
- String applicationId, Map<Integer, ProviderRole> providerRoleMap) {
- for (Container container : liveContainers) {
- // get the role name and label
- ProviderRole role = providerRoleMap.get(ContainerPriority
- .extractRole(container));
- if (role != null) {
- String roleName = role.name;
- String roleGroup = role.group;
- String label = getContainerLabel(container, roleName, roleGroup);
- log.info("Rebuilding in-memory: container {} in role {} in cluster {}",
- container.getId(), roleName, applicationId);
- getComponentStatuses().put(label,
- new ComponentInstanceState(roleGroup, container.getId(),
- applicationId));
- } else {
- log.warn("Role not found for container {} in cluster {}",
- container.getId(), applicationId);
- }
- }
- }
-
- @Override
- public boolean isSupportedRole(String role) {
- return true;
- }
-
- /**
- * Handle registration calls from the agents
- *
- * @param registration registration entry
- *
- * @return response
- */
- @Override
- public RegistrationResponse handleRegistration(Register registration) {
- log.info("Handling registration: {}", registration);
- RegistrationResponse response = new RegistrationResponse();
- String label = registration.getLabel();
- String pkg = registration.getPkg();
- State agentState = registration.getActualState();
- String appVersion = registration.getAppVersion();
-
- log.info("label: {} pkg: {}", label, pkg);
-
- if (getComponentStatuses().containsKey(label)) {
- response.setResponseStatus(RegistrationStatus.OK);
- ComponentInstanceState componentStatus = getComponentStatuses().get(label);
- componentStatus.heartbeat(System.currentTimeMillis());
- updateComponentStatusWithAgentState(componentStatus, agentState);
-
- String roleName = getRoleName(label);
- String roleGroup = getRoleGroup(label);
- String containerId = getContainerId(label);
-
- if (SliderUtils.isSet(registration.getTags())) {
- tags.recordAssignedTag(roleName, containerId, registration.getTags());
- } else {
- response.setTags(tags.getTag(roleName, containerId));
- }
-
- String hostFqdn = registration.getPublicHostname();
- Map<String, String> ports = registration.getAllocatedPorts();
- if (ports != null && !ports.isEmpty()) {
- processAllocatedPorts(hostFqdn, roleName, roleGroup, containerId, ports);
- }
-
- Map<String, String> folders = registration.getLogFolders();
- if (folders != null && !folders.isEmpty()) {
- publishFolderPaths(folders, containerId, roleName, hostFqdn);
- }
-
- // Set app version if empty. It gets unset during upgrade - why?
- checkAndSetContainerAppVersion(containerId, appVersion);
- } else {
- response.setResponseStatus(RegistrationStatus.FAILED);
- response.setLog("Label not recognized.");
- log.warn("Received registration request from unknown label {}", label);
- }
- log.info("Registration response: {}", response);
- return response;
- }
-
- // Checks if app version is empty. Sets it to the version as reported by the
- // container during registration phase.
- private void checkAndSetContainerAppVersion(String containerId,
- String appVersion) {
- StateAccessForProviders amState = getAmState();
- try {
- RoleInstance role = amState.getOwnedContainer(containerId);
- if (role != null) {
- String currentAppVersion = role.appVersion;
- log.debug("Container = {}, app version current = {} new = {}",
- containerId, currentAppVersion, appVersion);
- if (currentAppVersion == null
- || currentAppVersion.equals(APP_VERSION_UNKNOWN)) {
- amState.getOwnedContainer(containerId).appVersion = appVersion;
- }
- }
- } catch (NoSuchNodeException e) {
- // ignore - there is nothing to do if we don't find a container
- log.warn("Owned container {} not found - {}", containerId, e);
- }
- }
-
- /**
- * Handle heartbeat response from agents
- *
- * @param heartBeat incoming heartbeat from Agent
- *
- * @return response to send back
- */
- @Override
- public HeartBeatResponse handleHeartBeat(HeartBeat heartBeat) {
- log.debug("Handling heartbeat: {}", heartBeat);
- HeartBeatResponse response = new HeartBeatResponse();
- long id = heartBeat.getResponseId();
- response.setResponseId(id + 1L);
-
- String label = heartBeat.getHostname();
- String pkg = heartBeat.getPackage();
-
- log.debug("package received: " + pkg);
-
- String roleName = getRoleName(label);
- String roleGroup = getRoleGroup(label);
- String containerId = getContainerId(label);
- boolean doUpgrade = false;
- if (isInUpgradeMode && upgradeContainers.contains(containerId)) {
- doUpgrade = true;
- }
-
- StateAccessForProviders accessor = getAmState();
- CommandScript cmdScript = getScriptPathForMasterPackage(roleGroup);
- List<ComponentCommand> commands = getApplicationComponent(roleGroup).getCommands();
-
- if (!isDockerContainer(roleGroup) && !isYarnDockerContainer(roleGroup)
- && (cmdScript == null || cmdScript.getScript() == null)
- && commands.size() == 0) {
- log.error(
- "role.script is unavailable for {}. Commands will not be sent.",
- roleName);
- return response;
- }
-
- String scriptPath = null;
- long timeout = 600L;
- if (cmdScript != null) {
- scriptPath = cmdScript.getScript();
- timeout = cmdScript.getTimeout();
- }
-
- if (timeout == 0L) {
- timeout = 600L;
- }
-
- if (!getComponentStatuses().containsKey(label)) {
- // container is completed but still heart-beating, send terminate signal
- log.info(
- "Sending terminate signal to completed container (still heartbeating): {}",
- label);
- response.setTerminateAgent(true);
- return response;
- }
-
- List<ComponentStatus> statuses = heartBeat.getComponentStatus();
- if (statuses != null && !statuses.isEmpty()) {
- log.info("status from agent: " + statuses.toString());
- try {
- for(ComponentStatus status : statuses){
- RoleInstance role = null;
- if(status.getIp() != null && !status.getIp().isEmpty()){
- role = amState.getOwnedContainer(containerId);
- role.ip = status.getIp();
- }
- if(status.getHostname() != null && !status.getHostname().isEmpty()){
- role = amState.getOwnedContainer(containerId);
- role.hostname = status.getHostname();
- }
- if (role != null) {
- // create an updated service record (including hostname and ip) and publish...
- ServiceRecord record = new ServiceRecord();
- record.set(YarnRegistryAttributes.YARN_ID, containerId);
- record.description = roleName;
- record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
- PersistencePolicies.CONTAINER);
- // TODO: switch record attributes to use constants from YarnRegistryAttributes
- // when it's been updated.
- if (role.ip != null) {
- record.set("yarn:ip", role.ip);
- }
- if (role.hostname != null) {
- record.set("yarn:hostname", role.hostname);
- }
- yarnRegistry.putComponent(
- RegistryPathUtils.encodeYarnID(containerId), record);
-
- }
- }
-
-
- } catch (NoSuchNodeException e) {
- // ignore - there is nothing to do if we don't find a container
- log.warn("Owned container {} not found - {}", containerId, e);
- } catch (IOException e) {
- log.warn("Error updating container {} service record in registry",
- containerId, e);
- }
- }
-
- Boolean isMaster = isMaster(roleGroup);
- ComponentInstanceState componentStatus = getComponentStatuses().get(label);
- componentStatus.heartbeat(System.currentTimeMillis());
- if (doUpgrade) {
- switch (componentStatus.getState()) {
- case STARTED:
- componentStatus.setTargetState(State.UPGRADED);
- break;
- case UPGRADED:
- componentStatus.setTargetState(State.STOPPED);
- break;
- case STOPPED:
- componentStatus.setTargetState(State.TERMINATING);
- break;
- default:
- break;
- }
- log.info("Current state = {} target state {}",
- componentStatus.getState(), componentStatus.getTargetState());
- }
-
- if (appStopInitiated && !componentStatus.isStopInitiated()) {
- log.info("Stop initiated for label {}", label);
- componentStatus.setTargetState(State.STOPPED);
- componentStatus.setStopInitiated(true);
- }
-
- publishConfigAndExportGroups(heartBeat, componentStatus, roleGroup);
- CommandResult result = null;
- List<CommandReport> reports = heartBeat.getReports();
- if (SliderUtils.isNotEmpty(reports)) {
- CommandReport report = reports.get(0);
- Map<String, String> ports = report.getAllocatedPorts();
- if (SliderUtils.isNotEmpty(ports)) {
- processAllocatedPorts(heartBeat.getFqdn(), roleName, roleGroup, containerId, ports);
- }
- result = CommandResult.getCommandResult(report.getStatus());
- Command command = Command.getCommand(report.getRoleCommand());
- componentStatus.applyCommandResult(result, command, pkg);
- log.info("Component operation. Status: {}; new container state: {};"
- + " new component state: {}", result,
- componentStatus.getContainerState(), componentStatus.getState());
-
- if (command == Command.INSTALL && SliderUtils.isNotEmpty(report.getFolders())) {
- publishFolderPaths(report.getFolders(), containerId, roleName, heartBeat.getFqdn());
- }
- }
-
- int waitForCount = accessor.getInstanceDefinitionSnapshot().
- getAppConfOperations().getComponentOptInt(roleGroup, AgentKeys.WAIT_HEARTBEAT, 0);
-
- if (id < waitForCount) {
- log.info("Waiting until heartbeat count {}. Current val: {}", waitForCount, id);
- getComponentStatuses().put(label, componentStatus);
- return response;
- }
-
- Command command = componentStatus.getNextCommand(doUpgrade);
- try {
- if (Command.NOP != command) {
- log.debug("For comp {} pkg {} issuing {}", roleName,
- componentStatus.getNextPkgToInstall(), command.toString());
- if (command == Command.INSTALL) {
- log.info("Installing {} on {}.", roleName, containerId);
- if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){
- addInstallDockerCommand(roleName, roleGroup, containerId,
- response, null, timeout);
- } else if (scriptPath != null) {
- addInstallCommand(roleName, roleGroup, containerId, response,
- scriptPath, null, timeout, null);
- } else {
- // commands
- ComponentCommand installCmd = null;
- for (ComponentCommand compCmd : commands) {
- if (compCmd.getName().equals("INSTALL")) {
- installCmd = compCmd;
- }
- }
- addInstallCommand(roleName, roleGroup, containerId, response, null,
- installCmd, timeout, null);
- }
- componentStatus.commandIssued(command);
- } else if (command == Command.INSTALL_ADDON) {
- String nextPkgToInstall = componentStatus.getNextPkgToInstall();
- // retrieve scriptPath or command of that package for the component
- for (ComponentsInAddonPackage comp : packageMetainfo
- .get(nextPkgToInstall).getApplicationPackage().getComponents()) {
- // given nextPkgToInstall and roleName is determined, the if below
- // should only execute once per heartbeat
- log.debug("Addon component: {} pkg: {} script: {}", comp.getName(),
- nextPkgToInstall, comp.getCommandScript().getScript());
- if (comp.getName().equals(roleGroup)
- || comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) {
- scriptPath = comp.getCommandScript().getScript();
- if (scriptPath != null) {
- addInstallCommand(roleName, roleGroup, containerId, response,
- scriptPath, null, timeout, nextPkgToInstall);
- } else {
- ComponentCommand installCmd = null;
- for (ComponentCommand compCmd : comp.getCommands()) {
- if (compCmd.getName().equals("INSTALL")) {
- installCmd = compCmd;
- }
- }
- addInstallCommand(roleName, roleGroup, containerId, response,
- null, installCmd, timeout, nextPkgToInstall);
- }
- }
- }
- componentStatus.commandIssued(command);
- } else if (command == Command.START) {
- // check against dependencies
- boolean canExecute = commandOrder.canExecute(roleGroup, command, getComponentStatuses().values());
- if (canExecute) {
- log.info("Starting {} on {}.", roleName, containerId);
- if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){
- addStartDockerCommand(roleName, roleGroup, containerId,
- response, null, timeout, false);
- } else if (scriptPath != null) {
- addStartCommand(roleName,
- roleGroup,
- containerId,
- response,
- scriptPath,
- null,
- null,
- timeout,
- isMarkedAutoRestart(roleGroup));
- } else {
- ComponentCommand startCmd = null;
- for (ComponentCommand compCmd : commands) {
- if (compCmd.getName().equals("START")) {
- startCmd = compCmd;
- }
- }
- ComponentCommand stopCmd = null;
- for (ComponentCommand compCmd : commands) {
- if (compCmd.getName().equals("STOP")) {
- stopCmd = compCmd;
- }
- }
- addStartCommand(roleName, roleGroup, containerId, response, null,
- startCmd, stopCmd, timeout, false);
- }
- componentStatus.commandIssued(command);
- } else {
- log.info("Start of {} on {} delayed as dependencies have not started.", roleName, containerId);
- }
- } else if (command == Command.UPGRADE) {
- addUpgradeCommand(roleName, roleGroup, containerId, response,
- scriptPath, timeout);
- componentStatus.commandIssued(command, true);
- } else if (command == Command.STOP) {
- log.info("Stop command being sent to container with id {}",
- containerId);
- addStopCommand(roleName, roleGroup, containerId, response, scriptPath,
- timeout, doUpgrade);
- componentStatus.commandIssued(command);
- } else if (command == Command.TERMINATE) {
- log.info("A formal terminate command is being sent to container {}"
- + " in state {}", label, componentStatus.getState());
- response.setTerminateAgent(true);
- }
- }
-
- // if there is no outstanding command then retrieve config
- if (isMaster && componentStatus.getState() == State.STARTED
- && command == Command.NOP) {
- if (!componentStatus.getConfigReported()) {
- log.info("Requesting applied config for {} on {}.", roleName, containerId);
- if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){
- addGetConfigDockerCommand(roleName, roleGroup, containerId, response);
- } else {
- addGetConfigCommand(roleName, roleGroup, containerId, response);
- }
- }
- }
-
- // if restart is required then signal
- response.setRestartEnabled(false);
- if (componentStatus.getState() == State.STARTED
- && command == Command.NOP && isMarkedAutoRestart(roleGroup)) {
- response.setRestartEnabled(true);
- }
-
- //If INSTALL_FAILED and no INSTALL is scheduled let the agent fail
- if (componentStatus.getState() == State.INSTALL_FAILED
- && command == Command.NOP) {
- log.warn("Sending terminate signal to container that failed installation: {}", label);
- response.setTerminateAgent(true);
- }
-
- } catch (SliderException e) {
- log.warn("Component instance failed operation.", e);
- componentStatus.applyCommandResult(CommandResult.FAILED, command, null);
- }
-
- log.debug("Heartbeat response: " + response);
- return response;
- }
-
- private boolean isDockerContainer(String roleGroup) {
- String type = getApplicationComponent(roleGroup).getType();
- if (SliderUtils.isSet(type)) {
- return type.toLowerCase().equals(SliderUtils.DOCKER) || type.toLowerCase().equals(SliderUtils.DOCKER_YARN);
- }
- return false;
- }
-
- private boolean isYarnDockerContainer(String roleGroup) {
- String type = getApplicationComponent(roleGroup).getType();
- if (SliderUtils.isSet(type)) {
- return type.toLowerCase().equals(SliderUtils.DOCKER_YARN);
- }
- return false;
- }
-
- protected void processAllocatedPorts(String fqdn,
- String roleName,
- String roleGroup,
- String containerId,
- Map<String, String> ports) {
- RoleInstance instance;
- try {
- instance = getAmState().getOwnedContainer(containerId);
- } catch (NoSuchNodeException e) {
- log.warn("Failed to locate instance of container {}", containerId, e);
- instance = null;
- }
- for (Map.Entry<String, String> port : ports.entrySet()) {
- String portname = port.getKey();
- String portNo = port.getValue();
- log.info("Recording allocated port for {} as {}", portname, portNo);
-
- // add the allocated ports to the global list as well as per container list
- // per container allocation will over-write each other in the global
- this.getAllocatedPorts().put(portname, portNo);
- this.getAllocatedPorts(containerId).put(portname, portNo);
- if (instance != null) {
- try {
- // if the returned value is not a single port number then there are no
- // meaningful way for Slider to use it during export
- // No need to error out as it may not be the responsibility of the component
- // to allocate port or the component may need an array of ports
- instance.registerPortEndpoint(Integer.valueOf(portNo), portname);
- } catch (NumberFormatException e) {
- log.warn("Failed to parse {}", portNo, e);
- }
- }
- }
-
- processAndPublishComponentSpecificData(ports, containerId, fqdn, roleGroup);
- processAndPublishComponentSpecificExports(ports, containerId, fqdn, roleName, roleGroup);
-
- // and update registration entries
- if (instance != null) {
- queueAccess.put(new RegisterComponentInstance(instance.getId(),
- roleName, roleGroup, 0, TimeUnit.MILLISECONDS));
- }
- }
-
- private void updateComponentStatusWithAgentState(
- ComponentInstanceState componentStatus, State agentState) {
- if (agentState != null) {
- componentStatus.setState(agentState);
- }
- }
-
- @Override
- public Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterDesc) {
- Map<String, MonitorDetail> details = super.buildMonitorDetails(clusterDesc);
- buildRoleHostDetails(details);
- return details;
- }
-
- @Override
- public void applyInitialRegistryDefinitions(URL amWebURI,
- URL agentOpsURI,
- URL agentStatusURI,
- ServiceRecord serviceRecord)
- throws IOException {
- super.applyInitialRegistryDefinitions(amWebURI,
- agentOpsURI,
- agentStatusURI,
- serviceRecord);
-
- try {
- URL restURL = new URL(agentOpsURI, SLIDER_PATH_AGENTS);
- URL agentStatusURL = new URL(agentStatusURI, SLIDER_PATH_AGENTS);
-
- serviceRecord.addInternalEndpoint(
- new Endpoint(CustomRegistryConstants.AGENT_SECURE_REST_API,
- ProtocolTypes.PROTOCOL_REST,
- restURL.toURI()));
- serviceRecord.addInternalEndpoint(
- new Endpoint(CustomRegistryConstants.AGENT_ONEWAY_REST_API,
- ProtocolTypes.PROTOCOL_REST,
- agentStatusURL.toURI()));
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
-
- // identify client component
- Component client = null;
- for (Component component : getMetaInfo(null).getApplication()
- .getComponents()) {
- if (component.getCategory().equals("CLIENT")) {
- client = component;
- break;
- }
- }
- if (client == null) {
- log.info("No client component specified, not publishing client configs");
- return;
- }
-
- // register AM-generated client configs
- ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
- MapOperations clientOperations = appConf.getOrAddComponent(client.getName());
- appConf.resolve();
- if (!clientOperations.getOptionBool(AgentKeys.AM_CONFIG_GENERATION,
- false)) {
- log.info("AM config generation is false, not publishing client configs");
- return;
- }
-
- // build and localize configuration files
- Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
- Map<String, String> tokens = null;
- try {
- tokens = getStandardTokenMap(appConf, client.getName(), client.getName());
- } catch (SliderException e) {
- throw new IOException(e);
- }
-
- for (ConfigFile configFile : getMetaInfo(null)
- .getComponentConfigFiles(client.getName())) {
- addNamedConfiguration(configFile.getDictionaryName(),
- appConf.getGlobalOptions().options, configurations, tokens, null,
- client.getName(), client.getName());
- if (appConf.getComponent(client.getName()) != null) {
- addNamedConfiguration(configFile.getDictionaryName(),
- appConf.getComponent(client.getName()).options, configurations,
- tokens, null, client.getName(), client.getName());
- }
- }
-
- //do a final replacement of re-used configs
- dereferenceAllConfigs(configurations);
-
- for (ConfigFile configFile : getMetaInfo(null)
- .getComponentConfigFiles(client.getName())) {
- ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType());
-
- Map<String, String> config = configurations.get(configFile.getDictionaryName());
- ConfigUtils.prepConfigForTemplateOutputter(configFormat, config,
- fileSystem, getClusterName(),
- new File(configFile.getFileName()).getName());
- PublishedConfiguration publishedConfiguration =
- new PublishedConfiguration(configFile.getDictionaryName(),
- config.entrySet());
- getAmState().getPublishedSliderConfigurations().put(
- configFile.getDictionaryName(), publishedConfiguration);
- log.info("Publishing AM configuration {}", configFile.getDictionaryName());
- }
- }
-
- @Override
- public void notifyContainerCompleted(ContainerId containerId) {
- // containers get allocated and free'ed without being assigned to any
- // component - so many of the data structures may not be initialized
- if (containerId != null) {
- String containerIdStr = containerId.toString();
- if (getComponentInstanceData().containsKey(containerIdStr)) {
- getComponentInstanceData().remove(containerIdStr);
- log.info("Removing container specific data for {}", containerIdStr);
- publishComponentInstanceData();
- }
-
- if (this.allocatedPorts.containsKey(containerIdStr)) {
- Map<String, String> portsByContainerId = getAllocatedPorts(containerIdStr);
- this.allocatedPorts.remove(containerIdStr);
- // free up the allocations from global as well
- // if multiple containers allocate global ports then last one
- // wins and similarly first one removes it - its not supported anyway
- for(String portName : portsByContainerId.keySet()) {
- getAllocatedPorts().remove(portName);
- }
-
- }
-
- String componentName = null;
- synchronized (this.componentStatuses) {
- for (String label : getComponentStatuses().keySet()) {
- if (label.startsWith(containerIdStr)) {
- componentName = getRoleName(label);
- log.info("Removing component status for label {}", label);
- getComponentStatuses().remove(label);
- }
- }
- }
-
- tags.releaseTag(componentName, containerIdStr);
-
- synchronized (this.containerExportsMap) {
- Set<String> containerExportSets = containerExportsMap.get(containerIdStr);
- if (containerExportSets != null) {
- for (String containerExportStr : containerExportSets) {
- String[] parts = containerExportStr.split(":");
- Map<String, List<ExportEntry>> exportGroup = getCurrentExports(parts[0]);
- List<ExportEntry> exports = exportGroup.get(parts[1]);
- List<ExportEntry> exportToRemove = new ArrayList<ExportEntry>();
- for (ExportEntry export : exports) {
- if (containerIdStr.equals(export.getContainerId())) {
- exportToRemove.add(export);
- }
- }
- exports.removeAll(exportToRemove);
- }
- log.info("Removing container exports for {}", containerIdStr);
- containerExportsMap.remove(containerIdStr);
- }
- }
- }
- }
-
- /**
- * Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default.
- *
- * @param instanceDefinition
- */
- private void readAndSetHeartbeatMonitoringInterval(AggregateConf instanceDefinition) {
- String hbMonitorInterval = instanceDefinition.getAppConfOperations().
- getGlobalOptions().getOption(AgentKeys.HEARTBEAT_MONITOR_INTERVAL,
- Integer.toString(DEFAULT_HEARTBEAT_MONITOR_INTERVAL));
- try {
- setHeartbeatMonitorInterval(Integer.parseInt(hbMonitorInterval));
- } catch (NumberFormatException e) {
- log.warn(
- "Bad value {} for {}. Defaulting to ",
- hbMonitorInterval,
- HEARTBEAT_MONITOR_INTERVAL,
- DEFAULT_HEARTBEAT_MONITOR_INTERVAL);
- }
- }
-
- /**
- * Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default.
- *
- * @param instanceDefinition
- */
- private void initializeAgentDebugCommands(AggregateConf instanceDefinition) {
- String launchParameterStr = instanceDefinition.getAppConfOperations().
- getGlobalOptions().getOption(AgentKeys.AGENT_INSTANCE_DEBUG_DATA, "");
- agentLaunchParameter = new AgentLaunchParameter(launchParameterStr);
- }
-
- @VisibleForTesting
- protected Map<String, ExportEntry> getLogFolderExports() {
- return logFolderExports;
- }
-
- @VisibleForTesting
- protected Map<String, ExportEntry> getWorkFolderExports() {
- return workFolderExports;
- }
-
- @VisibleForTesting
- protected Metainfo getMetaInfo(String roleGroup) {
- String mapKey = DEFAULT_METAINFO_MAP_KEY;
- if (roleGroup != null) {
- ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
- mapKey = appConf.getComponentOpt(roleGroup, ROLE_PREFIX,
- DEFAULT_METAINFO_MAP_KEY);
- }
- MetainfoHolder mh = this.metaInfoMap.get(mapKey);
- if (mh == null) {
- return null;
- }
- return mh.metaInfo;
- }
-
- @VisibleForTesting
- protected Map<String, ComponentInstanceState> getComponentStatuses() {
- return componentStatuses;
- }
-
- @VisibleForTesting
- protected Metainfo getApplicationMetainfo(SliderFileSystem fileSystem,
- String appDef, boolean addonPackage) throws IOException,
- BadConfigException {
- return AgentUtils.getApplicationMetainfo(fileSystem, appDef, addonPackage);
- }
-
- @VisibleForTesting
- protected Metainfo getApplicationMetainfo(SliderFileSystem fileSystem,
- String appDef) throws IOException, BadConfigException {
- return getApplicationMetainfo(fileSystem, appDef, false);
- }
-
- @VisibleForTesting
- protected void setHeartbeatMonitorInterval(int heartbeatMonitorInterval) {
- this.heartbeatMonitorInterval = heartbeatMonitorInterval;
- }
-
- public void setInUpgradeMode(boolean inUpgradeMode) {
- this.isInUpgradeMode = inUpgradeMode;
- }
-
- public void addUpgradeContainers(Set<String> upgradeContainers) {
- this.upgradeContainers.addAll(upgradeContainers);
- }
-
- public void setAppStopInitiated(boolean appStopInitiated) {
- this.appStopInitiated = appStopInitiated;
- }
-
- /**
- * Read all default configs
- *
- * @param fileSystem fs
- * @param appDef app default path
- * @param metainfo metadata
- *
- * @return configuration maps
- *
- * @throws IOException
- */
- protected Map<String, DefaultConfig> initializeDefaultConfigs(SliderFileSystem fileSystem,
- String appDef, Metainfo metainfo) throws IOException {
- Map<String, DefaultConfig> defaultConfigMap = new HashMap<>();
- if (SliderUtils.isNotEmpty(metainfo.getApplication().getConfigFiles())) {
- for (ConfigFile configFile : metainfo.getApplication().getConfigFiles()) {
- DefaultConfig config = null;
- try {
- config = AgentUtils.getDefaultConfig(fileSystem, appDef, configFile.getDictionaryName() + ".xml");
- } catch (IOException e) {
- log.warn("Default config file not found. Only the config as input during create will be applied for {}",
- configFile.getDictionaryName());
- }
- if (config != null) {
- defaultConfigMap.put(configFile.getDictionaryName(), config);
- }
- }
- }
-
- return defaultConfigMap;
- }
-
- protected Map<String, DefaultConfig> getDefaultConfigs(String roleGroup) {
- ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
- String mapKey = appConf.getComponentOpt(roleGroup, ROLE_PREFIX,
- DEFAULT_METAINFO_MAP_KEY);
- return metaInfoMap.get(mapKey).defaultConfigs;
- }
-
- private int getHeartbeatMonitorInterval() {
- return this.heartbeatMonitorInterval;
- }
-
- private String getClusterName() {
- if (SliderUtils.isUnset(clusterName)) {
- clusterName = getAmState().getInternalsSnapshot().get(OptionKeys.APPLICATION_NAME);
- }
- return clusterName;
- }
-
- /**
- * Publish a named property bag that may contain name-value pairs for app configurations such as hbase-site
- *
- * @param name
- * @param description
- * @param entries
- */
- protected void publishApplicationInstanceData(String name, String description,
- Iterable<Map.Entry<String, String>> entries) {
- PublishedConfiguration pubconf = new PublishedConfiguration();
- pubconf.description = description;
- pubconf.putValues(entries);
- log.info("publishing {}", pubconf);
- getAmState().getPublishedSliderConfigurations().put(name, pubconf);
- }
-
- /**
- * Get a list of all hosts for all role/container per role
- *
- * @return the map of role->node
- */
- protected Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() {
- return amState.getRoleClusterNodeMapping();
- }
-
- private String getContainerLabel(Container container, String role, String group) {
- if (role.equals(group)) {
- return container.getId().toString() + LABEL_MAKER + role;
- } else {
- return container.getId().toString() + LABEL_MAKER + role + LABEL_MAKER +
- group;
- }
- }
-
- protected String getClusterInfoPropertyValue(String name) {
- StateAccessForProviders accessor = getAmState();
- assert accessor.isApplicationLive();
- ClusterDescription description = accessor.getClusterStatus();
- return description.getInfo(name);
- }
-
- protected String getClusterOptionPropertyValue(String name)
- throws BadConfigException {
- StateAccessForProviders accessor = getAmState();
- assert accessor.isApplicationLive();
- ClusterDescription description = accessor.getClusterStatus();
- return description.getMandatoryOption(name);
- }
-
- /**
- * Lost heartbeat from the container - release it and ask for a replacement (async operation)
- *
- * @param label
- * @param containerId
- */
- protected void lostContainer(
- String label,
- ContainerId containerId) {
- getComponentStatuses().remove(label);
- getQueueAccess().put(new ProviderReportedContainerLoss(containerId));
- }
-
- /**
- * Build the provider status, can be empty
- *
- * @return the provider status - map of entries to add to the info section
- */
- public Map<String, String> buildProviderStatus() {
- Map<String, String> stats = new HashMap<String, String>();
- return stats;
- }
-
-
- /**
- * Format the folder locations and publish in the registry service
- *
- * @param folders
- * @param containerId
- * @param hostFqdn
- * @param componentName
- */
- protected void publishFolderPaths(
- Map<String, String> folders, String containerId, String componentName, String hostFqdn) {
- Date now = new Date();
- for (Map.Entry<String, String> entry : folders.entrySet()) {
- ExportEntry exportEntry = new ExportEntry();
- exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn, entry.getValue()));
- exportEntry.setContainerId(containerId);
- exportEntry.setLevel(COMPONENT_TAG);
- exportEntry.setTag(componentName);
- exportEntry.setUpdatedTime(now.toString());
- if (entry.getKey().equals("AGENT_LOG_ROOT")) {
- synchronized (logFolderExports) {
- getLogFolderExports().put(containerId, exportEntry);
- }
- } else {
- synchronized (workFolderExports) {
- getWorkFolderExports().put(containerId, exportEntry);
- }
- }
- log.info("Updating log and pwd folders for container {}", containerId);
- }
-
- PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG);
- exports.setUpdated(now.getTime());
- synchronized (logFolderExports) {
- updateExportsFromList(exports, getLogFolderExports());
- }
- getAmState().getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports);
-
- exports = new PublishedExports(CONTAINER_PWDS_TAG);
- exports.setUpdated(now.getTime());
- synchronized (workFolderExports) {
- updateExportsFromList(exports, getWorkFolderExports());
- }
- getAmState().getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports);
- }
-
- /**
- * Update the export data from the map
- * @param exports
- * @param folderExports
- */
- private void updateExportsFromList(PublishedExports exports, Map<String, ExportEntry> folderExports) {
- Map<String, List<ExportEntry>> perComponentList = new HashMap<String, List<ExportEntry>>();
- for(Map.Entry<String, ExportEntry> logEntry : folderExports.entrySet())
- {
- String componentName = logEntry.getValue().getTag();
- if (!perComponentList.containsKey(componentName)) {
- perComponentList.put(componentName, new ArrayList<ExportEntry>());
- }
- perComponentList.get(componentName).add(logEntry.getValue());
- }
- exports.putValues(perComponentList.entrySet());
- }
-
-
- /**
- * Process return status for component instances
- *
- * @param heartBeat
- * @param componentStatus
- */
- protected void publishConfigAndExportGroups(HeartBeat heartBeat,
- ComponentInstanceState componentStatus, String componentGroup) {
- List<ComponentStatus> statuses = heartBeat.getComponentStatus();
- if (statuses != null && !statuses.isEmpty()) {
- log.info("Processing {} status reports.", statuses.size());
- for (ComponentStatus status : statuses) {
- log.info("Status report: {}", status.toString());
-
- if (status.getConfigs() != null) {
- Application application = getMetaInfo(componentGroup).getApplication();
-
- if ((!canAnyMasterPublishConfig(componentGroup) || canPublishConfig(componentGroup)) &&
- !getAmState().getAppConfSnapshot().getComponentOptBool(
- componentGroup, AgentKeys.AM_CONFIG_GENERATION, false)) {
- // If no Master can explicitly publish then publish if its a master
- // Otherwise, wait till the master that can publish is ready
-
- Set<String> exportedConfigs = new HashSet();
- String exportedConfigsStr = application.getExportedConfigs();
- boolean exportedAllConfigs = exportedConfigsStr == null || exportedConfigsStr.isEmpty();
- if (!exportedAllConfigs) {
- for (String exportedConfig : exportedConfigsStr.split(",")) {
- if (exportedConfig.trim().length() > 0) {
- exportedConfigs.add(exportedConfig.trim());
- }
- }
- }
-
- for (String key : status.getConfigs().keySet()) {
- if ((!exportedAllConfigs && exportedConfigs.contains(key)) ||
- exportedAllConfigs) {
- Map<String, String> configs = status.getConfigs().get(key);
- publishApplicationInstanceData(key, key, configs.entrySet());
- }
- }
- }
-
- List<ExportGroup> appExportGroups = application.getExportGroups();
- boolean hasExportGroups = SliderUtils.isNotEmpty(appExportGroups);
-
- Set<String> appExports = new HashSet();
- String appExportsStr = getApplicationComponent(componentGroup).getAppExports();
- if (SliderUtils.isSet(appExportsStr)) {
- for (String appExport : appExportsStr.split(",")) {
- if (!appExport.trim().isEmpty()) {
- appExports.add(appExport.trim());
- }
- }
- }
-
- if (hasExportGroups && !appExports.isEmpty()) {
- String configKeyFormat = "${site.%s.%s}";
- String hostKeyFormat = "${%s_HOST}";
-
- // publish export groups if any
- Map<String, String> replaceTokens = new HashMap<String, String>();
- for (Map.Entry<String, Map<String, ClusterNode>> entry : getRoleClusterNodeMapping().entrySet()) {
- String hostName = getHostsList(entry.getValue().values(), true).iterator().next();
- replaceTokens.put(String.format(hostKeyFormat, entry.getKey().toUpperCase(Locale.ENGLISH)), hostName);
- }
-
- for (String key : status.getConfigs().keySet()) {
- Map<String, String> configs = status.getConfigs().get(key);
- for (String configKey : configs.keySet()) {
- String lookupKey = String.format(configKeyFormat, key, configKey);
- replaceTokens.put(lookupKey, configs.get(configKey));
- }
- }
-
- Set<String> modifiedGroups = new HashSet<String>();
- for (ExportGroup exportGroup : appExportGroups) {
- List<Export> exports = exportGroup.getExports();
- if (SliderUtils.isNotEmpty(exports)) {
- String exportGroupName = exportGroup.getName();
- ConcurrentHashMap<String, List<ExportEntry>> map =
- (ConcurrentHashMap<String, List<ExportEntry>>)getCurrentExports(exportGroupName);
- for (Export export : exports) {
- if (canBeExported(exportGroupName, export.getName(), appExports)) {
- String value = export.getValue();
- // replace host names
- for (String token : replaceTokens.keySet()) {
- if (value.contains(token)) {
- value = value.replace(token, replaceTokens.get(token));
- }
- }
- ExportEntry entry = new ExportEntry();
- entry.setLevel(APPLICATION_TAG);
- entry.setValue(value);
- entry.setUpdatedTime(new Date().toString());
- // over-write, app exports are singletons
- map.put(export.getName(), new ArrayList(Arrays.asList(entry)));
- log.info("Preparing to publish. Key {} and Value {}", export.getName(), value);
- }
- }
- modifiedGroups.add(exportGroupName);
- }
- }
- publishModifiedExportGroups(modifiedGroups);
- }
-
- log.info("Received and processed config for {}", heartBeat.getHostname());
- componentStatus.setConfigReported(true);
-
- }
- }
- }
- }
-
- private boolean canBeExported(String exportGroupName, String name, Set<String> appExports) {
- return appExports.contains(String.format("%s-%s", exportGroupName, name));
- }
-
- protected Map<String, List<ExportEntry>> getCurrentExports(String groupName) {
- if (!this.exportGroups.containsKey(groupName)) {
- synchronized (this.exportGroups) {
- if (!this.exportGroups.containsKey(groupName)) {
- this.exportGroups.put(groupName, new ConcurrentHashMap<String, List<ExportEntry>>());
- }
- }
- }
-
- return this.exportGroups.get(groupName);
- }
-
- private void publishModifiedExportGroups(Set<String> modifiedGroups) {
- for (String groupName : modifiedGroups) {
- Map<String, List<ExportEntry>> entries = this.exportGroups.get(groupName);
-
- // Publish in old format for the time being
- Map<String, String> simpleEntries = new HashMap<String, String>();
- for (Map.Entry<String, List<ExportEntry>> entry : entries.entrySet()) {
- List<ExportEntry> exports = entry.getValue();
- if (SliderUtils.isNotEmpty(exports)) {
- // there is no support for multiple exports per name - so extract only the first one
- simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue());
- }
- }
- if (!getAmState().getAppConfSnapshot().getComponentOptBool(
- groupName, AgentKeys.AM_CONFIG_GENERATION, false)) {
- publishApplicationInstanceData(groupName, groupName,
- simpleEntries.entrySet());
- }
-
- PublishedExports exports = new PublishedExports(groupName);
- exports.setUpdated(new Date().getTime());
- exports.putValues(entries.entrySet());
- getAmState().getPublishedExportsSet().put(groupName, exports);
- }
- }
-
- /** Publish component instance specific data if the component demands it */
- protected void processAndPublishComponentSpecificData(Map<String, String> ports,
- String containerId,
- String hostFqdn,
- String componentGroup) {
- String portVarFormat = "${site.%s}";
- String hostNamePattern = "${THIS_HOST}";
- Map<String, String> toPublish = new HashMap<String, String>();
-
- Application application = getMetaInfo(componentGroup).getApplication();
- for (Component component : application.getComponents()) {
- if (component.getName().equals(componentGroup)) {
- if (component.getComponentExports().size() > 0) {
-
- for (ComponentExport export : component.getComponentExports()) {
- String templateToExport = export.getValue();
- for (String portName : ports.keySet()) {
- boolean publishData = false;
- String portValPattern = String.format(portVarFormat, portName);
- if (templateToExport.contains(portValPattern)) {
- templateToExport = templateToExport.replace(portValPattern, ports.get(portName));
- publishData = true;
- }
- if (templateToExport.contains(hostNamePattern)) {
- templateToExport = templateToExport.replace(hostNamePattern, hostFqdn);
- publishData = true;
- }
- if (publishData) {
- toPublish.put(export.getName(), templateToExport);
- log.info("Publishing {} for name {} and container {}",
- templateToExport, export.getName(), containerId);
- }
- }
- }
- }
- }
- }
-
- if (toPublish.size() > 0) {
- Map<String, String> perContainerData
<TRUNCATED>