You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ja...@apache.org on 2016/12/29 00:10:34 UTC
ambari git commit: AMBARI-19275. Single API to download all client
configs. (jaimin)
Repository: ambari
Updated Branches:
refs/heads/trunk 6add93ea4 -> 6db03934b
AMBARI-19275. Single API to download all client configs. (jaimin)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6db03934
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6db03934
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6db03934
Branch: refs/heads/trunk
Commit: 6db03934bddb971de258a73150145f928e52c5cb
Parents: 6add93e
Author: Jaimin Jetly <ja...@hortonworks.com>
Authored: Wed Dec 28 16:09:40 2016 -0800
Committer: Jaimin Jetly <ja...@hortonworks.com>
Committed: Wed Dec 28 16:10:28 2016 -0800
----------------------------------------------------------------------
ambari-server/pom.xml | 5 +
.../server/api/services/ComponentService.java | 27 +-
.../api/services/HostComponentService.java | 25 +-
.../server/configuration/Configuration.java | 19 +
.../internal/ClientConfigResourceProvider.java | 850 +++++++++++++------
.../api/services/ComponentServiceTest.java | 4 +-
.../api/services/HostComponentServiceTest.java | 4 +-
.../ClientConfigResourceProviderTest.java | 36 +-
8 files changed, 684 insertions(+), 286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6db03934/ambari-server/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml
index 102cb71..90c6b61 100644
--- a/ambari-server/pom.xml
+++ b/ambari-server/pom.xml
@@ -1039,6 +1039,11 @@
<artifactId>commons-csv</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.5</version>
+ </dependency>
+ <dependency>
<groupId>uk.com.robust-it</groupId>
<artifactId>cloning</artifactId>
<version>1.9.2</version>
http://git-wip-us.apache.org/repos/asf/ambari/blob/6db03934/ambari-server/src/main/java/org/apache/ambari/server/api/services/ComponentService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/ComponentService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/ComponentService.java
index ded2596..1725b11 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/ComponentService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/ComponentService.java
@@ -41,6 +41,8 @@ import javax.ws.rs.core.UriInfo;
import org.apache.ambari.server.api.resources.ResourceInstance;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
/**
* Service responsible for components resource requests.
@@ -99,7 +101,12 @@ public class ComponentService extends BaseService {
*/
@GET
@Produces("text/plain")
- public Response getComponents(String body, @Context HttpHeaders headers, @Context UriInfo ui) {
+ public Response getComponents(String body, @Context HttpHeaders headers, @Context UriInfo ui,
+ @QueryParam("format") String format) {
+
+ if (format != null && format.equals("client_config_tar")) {
+ return createClientConfigResource(body, headers, ui, null);
+ }
return handleRequest(headers, body, ui, Request.Type.GET,
createComponentResource(m_clusterName, m_serviceName, null));
}
@@ -227,7 +234,20 @@ public class ComponentService extends BaseService {
mapIds.put(Resource.Type.Cluster, m_clusterName);
mapIds.put(Resource.Type.Service, m_serviceName);
mapIds.put(Resource.Type.Component, componentName);
+ String filePrefixName;
+
+ if (StringUtils.isEmpty(componentName)) {
+ if (StringUtils.isEmpty(m_serviceName)) {
+ filePrefixName = m_clusterName + "(" + Resource.InternalType.Cluster.toString().toUpperCase()+")";
+ } else {
+ filePrefixName = m_serviceName + "(" + Resource.InternalType.Service.toString().toUpperCase()+")";
+ }
+ } else {
+ filePrefixName = componentName;
+ }
+ Validate.notNull(filePrefixName, "compressed config file name should not be null");
+ String fileName = filePrefixName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION;
Response response = handleRequest(headers, body, ui, Request.Type.GET,
createResource(Resource.Type.ClientConfig, mapIds));
@@ -240,7 +260,7 @@ public class ComponentService extends BaseService {
Response.ResponseBuilder rb = Response.status(Response.Status.OK);
Configuration configs = new Configuration();
String tmpDir = configs.getProperty(Configuration.SERVER_TMP_DIR.getKey());
- File file = new File(tmpDir + File.separator + componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION);
+ File file = new File(tmpDir,fileName);
InputStream resultInputStream = null;
try {
resultInputStream = new FileInputStream(file);
@@ -249,8 +269,7 @@ public class ComponentService extends BaseService {
}
String contentType = Configuration.DEF_ARCHIVE_CONTENT_TYPE;
- String outputFileName = componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION;
- rb.header("Content-Disposition", "attachment; filename=\"" + outputFileName + "\"");
+ rb.header("Content-Disposition", "attachment; filename=\"" + fileName + "\"");
rb.entity(resultInputStream);
return rb.type(contentType).build();
http://git-wip-us.apache.org/repos/asf/ambari/blob/6db03934/ambari-server/src/main/java/org/apache/ambari/server/api/services/HostComponentService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/HostComponentService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/HostComponentService.java
index bc9bb30..cdd1761 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/HostComponentService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/HostComponentService.java
@@ -41,6 +41,8 @@ import javax.ws.rs.core.UriInfo;
import org.apache.ambari.server.api.resources.ResourceInstance;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
/**
* Service responsible for host_components resource requests.
@@ -107,7 +109,10 @@ public class HostComponentService extends BaseService {
*/
@GET
@Produces("text/plain")
- public Response getHostComponents(String body, @Context HttpHeaders headers, @Context UriInfo ui) {
+ public Response getHostComponents(String body, @Context HttpHeaders headers, @Context UriInfo ui, @QueryParam("format") String format) {
+ if (format != null && format.equals("client_config_tar")) {
+ return createClientConfigResource(body, headers, ui, null);
+ }
return handleRequest(headers, body, ui, Request.Type.GET,
createHostComponentResource(m_clusterName, m_hostName, null));
}
@@ -277,10 +282,21 @@ public class HostComponentService extends BaseService {
return response;
}
+ String filePrefixName;
+
+ if (StringUtils.isEmpty(hostComponentName)) {
+ filePrefixName = m_hostName + "(" + Resource.InternalType.Host.toString().toUpperCase()+")";
+ } else {
+ filePrefixName = hostComponentName;
+ }
+
+ Validate.notNull(filePrefixName, "compressed config file name should not be null");
+ String fileName = filePrefixName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION;
+
Response.ResponseBuilder rb = Response.status(Response.Status.OK);
Configuration configs = new Configuration();
String tmpDir = configs.getProperty(Configuration.SERVER_TMP_DIR.getKey());
- File file = new File(tmpDir+File.separator+hostComponentName+"-configs.tar.gz");
+ File file = new File(tmpDir,fileName);
InputStream resultInputStream = null;
try {
resultInputStream = new FileInputStream(file);
@@ -288,9 +304,8 @@ public class HostComponentService extends BaseService {
e.printStackTrace();
}
- String contentType = "application/x-ustar";
- String outputFileName = hostComponentName + "-configs.tar.gz";
- rb.header("Content-Disposition", "attachment; filename=\"" + outputFileName + "\"");
+ String contentType = Configuration.DEF_ARCHIVE_CONTENT_TYPE;
+ rb.header("Content-Disposition", "attachment; filename=\"" + fileName + "\"");
rb.entity(resultInputStream);
return rb.type(contentType).build();
http://git-wip-us.apache.org/repos/asf/ambari/blob/6db03934/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index d2f7934..7f047c8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -1762,6 +1762,14 @@ public class Configuration {
public static final ConfigurationProperty<Integer> EXTERNAL_SCRIPT_TIMEOUT = new ConfigurationProperty<>(
"server.script.timeout", 5000);
+ /**
+ * The time, in {@link TimeUnit#MILLISECONDS}, until an external script is killed.
+ * n threads will execute n/2 scripts. one extra thread is needed to gather error/output stream of external script
+ */
+ @Markdown(description = "The number of threads that should be allocated to run external script.")
+ public static final ConfigurationProperty<Integer> THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT = new ConfigurationProperty<>(
+ "server.script.threads", 4);
+
public static final String DEF_ARCHIVE_EXTENSION;
public static final String DEF_ARCHIVE_CONTENT_TYPE;
@@ -2773,6 +2781,7 @@ public class Configuration {
configsMap.put(LOG4JMONITOR_DELAY.getKey(), getProperty(LOG4JMONITOR_DELAY));
configsMap.put(REQUEST_LOG_RETAINDAYS.getKey(), getProperty(REQUEST_LOG_RETAINDAYS));
configsMap.put(EXTERNAL_SCRIPT_TIMEOUT.getKey(), getProperty(EXTERNAL_SCRIPT_TIMEOUT));
+ configsMap.put(THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT.getKey(), getProperty(THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT));
configsMap.put(SHARED_RESOURCES_DIR.getKey(), getProperty(SHARED_RESOURCES_DIR));
configsMap.put(KDC_PORT.getKey(), getProperty(KDC_PORT));
configsMap.put(AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT.getKey(), getProperty(AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT));
@@ -4430,6 +4439,16 @@ public class Configuration {
return Integer.parseInt(getProperty(EXTERNAL_SCRIPT_TIMEOUT));
}
+ //THREAD_POOL_FOR_EXTERNAL_SCRIPT
+
+ /**
+ * Get the threadpool size for external script execution
+ * @return {Integer}
+ */
+ public Integer getExternalScriptThreadPoolSize() {
+ return Integer.parseInt(getProperty(THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT));
+ }
+
public boolean getParallelStageExecution() {
return Boolean.parseBoolean(configsMap.get(PARALLEL_STAGE_EXECUTION.getKey()));
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6db03934/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
index 020a454..8a35c98 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProvider.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ambari.server.controller.internal;
import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.AGENT_STACK_RETRY_COUNT;
@@ -36,9 +37,13 @@ import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.STACK_NAM
import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.STACK_VERSION;
import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.USER_LIST;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -53,6 +58,9 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ambari.server.AmbariException;
@@ -85,6 +93,13 @@ import org.apache.ambari.server.state.ServiceOsSpecific;
import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.utils.SecretReference;
import org.apache.ambari.server.utils.StageUtils;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,7 +121,6 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv
protected static final String COMPONENT_COMPONENT_NAME_PROPERTY_ID = "ServiceComponentInfo/component_name";
protected static final String HOST_COMPONENT_HOST_NAME_PROPERTY_ID =
PropertyHelper.getPropertyId("HostRoles", "host_name");
- private static final int SCRIPT_TIMEOUT = 1500;
private final Gson gson;
@@ -173,75 +187,116 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv
throw new SystemException("Failed to get components ", e);
}
+ Map<String,ServiceComponentHostResponse> componentMap = new HashMap<>();
+
+ // reduce set of sch responses to one sch response for every service component
+ for (ServiceComponentHostResponse resp: responses) {
+ String componentName = resp.getComponentName();
+ if (!componentMap.containsKey(componentName)) {
+ componentMap.put(resp.getComponentName(),resp);
+ }
+ }
+
+ ServiceComponentHostRequest schRequest = requests.iterator().next();
+ String requestComponentName = schRequest.getComponentName();
+ String requestServiceName = schRequest.getServiceName();
+ String requestHostName = schRequest.getHostname();
+
+ Map<String,List<ServiceComponentHostResponse>> serviceToComponentMap = new HashMap<String,List<ServiceComponentHostResponse>>();
+
+ // sch response for the service components that have configFiles defined in the stack definition of the service
+ List <ServiceComponentHostResponse> schWithConfigFiles = new ArrayList<>();
+
Configuration configs = new Configuration();
Map<String, String> configMap = configs.getConfigsMap();
String TMP_PATH = configMap.get(Configuration.SERVER_TMP_DIR.getKey());
String pythonCmd = configMap.get(Configuration.AMBARI_PYTHON_WRAP.getKey());
- AmbariManagementController managementController = getManagementController();
- ConfigHelper configHelper = managementController.getConfigHelper();
- Cluster cluster = null;
- Clusters clusters = managementController.getClusters();
- try {
- cluster = clusters.getCluster(responses.iterator().next().getClusterName());
+ List<String> pythonCompressFilesCmds = new ArrayList<>();
- StackId stackId = cluster.getCurrentStackVersion();
- String serviceName = responses.iterator().next().getServiceName();
- String componentName = responses.iterator().next().getComponentName();
- String hostName = responses.iterator().next().getHostname();
- ComponentInfo componentInfo = null;
- String packageFolder = null;
+ for (ServiceComponentHostResponse response : componentMap.values()){
- componentInfo = managementController.getAmbariMetaInfo().
- getComponent(stackId.getStackName(), stackId.getStackVersion(), serviceName, componentName);
- packageFolder = managementController.getAmbariMetaInfo().
- getService(stackId.getStackName(), stackId.getStackVersion(), serviceName).getServicePackageFolder();
+ AmbariManagementController managementController = getManagementController();
+ ConfigHelper configHelper = managementController.getConfigHelper();
+ Cluster cluster = null;
+ Clusters clusters = managementController.getClusters();
+ try {
+ cluster = clusters.getCluster(response.getClusterName());
+
+ StackId stackId = cluster.getCurrentStackVersion();
+ String serviceName = response.getServiceName();
+ String componentName = response.getComponentName();
+ String hostName = response.getHostname();
+ ComponentInfo componentInfo = null;
+ String packageFolder = null;
+
+ componentInfo = managementController.getAmbariMetaInfo().
+ getComponent(stackId.getStackName(), stackId.getStackVersion(), serviceName, componentName);
+ packageFolder = managementController.getAmbariMetaInfo().
+ getService(stackId.getStackName(), stackId.getStackVersion(), serviceName).getServicePackageFolder();
+
+ String commandScript = componentInfo.getCommandScript().getScript();
+ List<ClientConfigFileDefinition> clientConfigFiles = componentInfo.getClientConfigFiles();
+
+ if (clientConfigFiles == null) {
+ if (componentMap.size() == 1) {
+ throw new SystemException("No configuration files defined for the component " + componentInfo.getName());
+ } else {
+ LOG.debug(String.format("No configuration files defined for the component %s",componentInfo.getName()));
+ continue;
+ }
+ }
- String commandScript = componentInfo.getCommandScript().getScript();
- List<ClientConfigFileDefinition> clientConfigFiles = componentInfo.getClientConfigFiles();
+ // service component hosts that have configFiles defined in the stack definition of the service
+ schWithConfigFiles.add(response);
- if (clientConfigFiles == null) {
- throw new SystemException("No configuration files defined for the component " + componentInfo.getName());
- }
+ if (serviceToComponentMap.containsKey(response.getServiceName())) {
+ List <ServiceComponentHostResponse> schResponseList = serviceToComponentMap.get(serviceName);
+ schResponseList.add(response);
+ } else {
+ List <ServiceComponentHostResponse> schResponseList = new ArrayList<>();
+ schResponseList.add(response);
+ serviceToComponentMap.put(serviceName,schResponseList);
+ }
- String resourceDirPath = configs.getResourceDirPath();
- String packageFolderAbsolute = resourceDirPath + File.separator + packageFolder;
+ String resourceDirPath = configs.getResourceDirPath();
+ String packageFolderAbsolute = resourceDirPath + File.separator + packageFolder;
- String commandScriptAbsolute = packageFolderAbsolute + File.separator + commandScript;
+ String commandScriptAbsolute = packageFolderAbsolute + File.separator + commandScript;
- Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
- Map<String, Long> configVersions = new TreeMap<String, Long>();
- Map<String, Map<PropertyType, Set<String>>> configPropertiesTypes = new TreeMap<>();
- Map<String, Map<String, Map<String, String>>> configurationAttributes = new TreeMap<String, Map<String, Map<String, String>>>();
+ Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
+ Map<String, Long> configVersions = new TreeMap<String, Long>();
+ Map<String, Map<PropertyType, Set<String>>> configPropertiesTypes = new TreeMap<>();
+ Map<String, Map<String, Map<String, String>>> configurationAttributes = new TreeMap<String, Map<String, Map<String, String>>>();
- Map<String, DesiredConfig> desiredClusterConfigs = cluster.getDesiredConfigs();
+ Map<String, DesiredConfig> desiredClusterConfigs = cluster.getDesiredConfigs();
- //Get configurations and configuration attributes
- for (Map.Entry<String, DesiredConfig> desiredConfigEntry : desiredClusterConfigs.entrySet()) {
+ //Get configurations and configuration attributes
+ for (Map.Entry<String, DesiredConfig> desiredConfigEntry : desiredClusterConfigs.entrySet()) {
- String configType = desiredConfigEntry.getKey();
- DesiredConfig desiredConfig = desiredConfigEntry.getValue();
- Config clusterConfig = cluster.getConfig(configType, desiredConfig.getTag());
+ String configType = desiredConfigEntry.getKey();
+ DesiredConfig desiredConfig = desiredConfigEntry.getValue();
+ Config clusterConfig = cluster.getConfig(configType, desiredConfig.getTag());
- if (clusterConfig != null) {
- Map<String, String> props = new HashMap<String, String>(clusterConfig.getProperties());
+ if (clusterConfig != null) {
+ Map<String, String> props = new HashMap<String, String>(clusterConfig.getProperties());
- // Apply global properties for this host from all config groups
- Map<String, Map<String, String>> allConfigTags = null;
- allConfigTags = configHelper
- .getEffectiveDesiredTags(cluster, hostName);
+ // Apply global properties for this host from all config groups
+ Map<String, Map<String, String>> allConfigTags = null;
+ allConfigTags = configHelper
+ .getEffectiveDesiredTags(cluster, schRequest.getHostname());
- Map<String, Map<String, String>> configTags = new HashMap<String,
- Map<String, String>>();
+ Map<String, Map<String, String>> configTags = new HashMap<String,
+ Map<String, String>>();
- for (Map.Entry<String, Map<String, String>> entry : allConfigTags.entrySet()) {
- if (entry.getKey().equals(clusterConfig.getType())) {
- configTags.put(clusterConfig.getType(), entry.getValue());
+ for (Map.Entry<String, Map<String, String>> entry : allConfigTags.entrySet()) {
+ if (entry.getKey().equals(clusterConfig.getType())) {
+ configTags.put(clusterConfig.getType(), entry.getValue());
+ }
}
- }
- Map<String, Map<String, String>> properties = configHelper
- .getEffectiveConfigProperties(cluster, configTags);
+ Map<String, Map<String, String>> properties = configHelper
+ .getEffectiveConfigProperties(cluster, configTags);
if (!properties.isEmpty()) {
for (Map<String, String> propertyMap : properties.values()) {
@@ -249,178 +304,198 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv
}
}
- configurations.put(clusterConfig.getType(), props);
- configVersions.put(clusterConfig.getType(), clusterConfig.getVersion());
- configPropertiesTypes.put(clusterConfig.getType(), clusterConfig.getPropertiesTypes());
+ configurations.put(clusterConfig.getType(), props);
+ configVersions.put(clusterConfig.getType(), clusterConfig.getVersion());
+ configPropertiesTypes.put(clusterConfig.getType(), clusterConfig.getPropertiesTypes());
- Map<String, Map<String, String>> attrs = new TreeMap<String, Map<String, String>>();
- configHelper.cloneAttributesMap(clusterConfig.getPropertiesAttributes(), attrs);
+ Map<String, Map<String, String>> attrs = new TreeMap<String, Map<String, String>>();
+ configHelper.cloneAttributesMap(clusterConfig.getPropertiesAttributes(), attrs);
- Map<String, Map<String, Map<String, String>>> attributes = configHelper
- .getEffectiveConfigAttributes(cluster, configTags);
- for (Map<String, Map<String, String>> attributesMap : attributes.values()) {
- configHelper.cloneAttributesMap(attributesMap, attrs);
+ Map<String, Map<String, Map<String, String>>> attributes = configHelper
+ .getEffectiveConfigAttributes(cluster, configTags);
+ for (Map<String, Map<String, String>> attributesMap : attributes.values()) {
+ configHelper.cloneAttributesMap(attributesMap, attrs);
+ }
+ configurationAttributes.put(clusterConfig.getType(), attrs);
}
- configurationAttributes.put(clusterConfig.getType(), attrs);
}
- }
- ConfigHelper.processHiddenAttribute(configurations, configurationAttributes, componentName, true);
+ ConfigHelper.processHiddenAttribute(configurations, configurationAttributes, componentName, true);
- for(Map.Entry<String, Map<String, Map<String, String>>> configurationAttributesEntry : configurationAttributes.entrySet()){
- Map<String, Map<String, String>> attrs = configurationAttributesEntry.getValue();
- // remove internal attributes like "hidden"
- attrs.remove("hidden");
- }
+ for (Map.Entry<String, Map<String, Map<String, String>>> configurationAttributesEntry : configurationAttributes.entrySet()) {
+ Map<String, Map<String, String>> attrs = configurationAttributesEntry.getValue();
+ // remove internal attributes like "hidden"
+ attrs.remove("hidden");
+ }
- // replace passwords on password references
- for(Map.Entry<String, Map<String, String>> configEntry: configurations.entrySet()) {
- String configType = configEntry.getKey();
- Map<String, String> configProperties = configEntry.getValue();
- Long configVersion = configVersions.get(configType);
- Map<PropertyType, Set<String>> propertiesTypes = configPropertiesTypes.get(configType);
- SecretReference.replacePasswordsWithReferences(propertiesTypes, configProperties, configType, configVersion);
- }
+ // replace passwords on password references
+ for (Map.Entry<String, Map<String, String>> configEntry : configurations.entrySet()) {
+ String configType = configEntry.getKey();
+ Map<String, String> configProperties = configEntry.getValue();
+ Long configVersion = configVersions.get(configType);
+ Map<PropertyType, Set<String>> propertiesTypes = configPropertiesTypes.get(configType);
+ SecretReference.replacePasswordsWithReferences(propertiesTypes, configProperties, configType, configVersion);
+ }
- Map<String, Set<String>> clusterHostInfo = null;
- ServiceInfo serviceInfo = null;
- String osFamily = null;
- clusterHostInfo = StageUtils.getClusterHostInfo(cluster);
- serviceInfo = managementController.getAmbariMetaInfo().getService(stackId.getStackName(),
- stackId.getStackVersion(), serviceName);
- try {
- clusterHostInfo = StageUtils.substituteHostIndexes(clusterHostInfo);
- } catch (AmbariException e) {
- // Before moving substituteHostIndexes to StageUtils, a SystemException was thrown in the
- // event an index could not be mapped to a host. After the move, this was changed to an
- // AmbariException for consistency in the StageUtils class. To keep this method consistent
- // with how it behaved in the past, if an AmbariException is thrown, it is caught and
- // translated to a SystemException.
- throw new SystemException(e.getMessage(), e);
- }
- osFamily = clusters.getHost(hostName).getOsFamily();
-
- TreeMap<String, String> hostLevelParams = new TreeMap<String, String>();
- hostLevelParams.put(JDK_LOCATION, managementController.getJdkResourceUrl());
- hostLevelParams.put(JAVA_HOME, managementController.getJavaHome());
- hostLevelParams.put(JAVA_VERSION, String.valueOf(configs.getJavaVersion()));
- hostLevelParams.put(JDK_NAME, managementController.getJDKName());
- hostLevelParams.put(JCE_NAME, managementController.getJCEName());
- hostLevelParams.put(STACK_NAME, stackId.getStackName());
- hostLevelParams.put(STACK_VERSION, stackId.getStackVersion());
- hostLevelParams.put(DB_NAME, managementController.getServerDB());
- hostLevelParams.put(MYSQL_JDBC_URL, managementController.getMysqljdbcUrl());
- hostLevelParams.put(ORACLE_JDBC_URL, managementController.getOjdbcUrl());
- hostLevelParams.put(HOST_SYS_PREPPED, configs.areHostsSysPrepped());
- hostLevelParams.putAll(managementController.getRcaParameters());
- hostLevelParams.put(AGENT_STACK_RETRY_ON_UNAVAILABILITY, configs.isAgentStackRetryOnInstallEnabled());
- hostLevelParams.put(AGENT_STACK_RETRY_COUNT, configs.getAgentStackRetryOnInstallCount());
-
- // Write down os specific info for the service
- ServiceOsSpecific anyOs = null;
- if (serviceInfo.getOsSpecifics().containsKey(AmbariMetaInfo.ANY_OS)) {
- anyOs = serviceInfo.getOsSpecifics().get(AmbariMetaInfo.ANY_OS);
- }
+ Map<String, Set<String>> clusterHostInfo = null;
+ ServiceInfo serviceInfo = null;
+ String osFamily = null;
+ clusterHostInfo = StageUtils.getClusterHostInfo(cluster);
+ serviceInfo = managementController.getAmbariMetaInfo().getService(stackId.getStackName(),
+ stackId.getStackVersion(), serviceName);
+ try {
+ clusterHostInfo = StageUtils.substituteHostIndexes(clusterHostInfo);
+ } catch (AmbariException e) {
+ // Before moving substituteHostIndexes to StageUtils, a SystemException was thrown in the
+ // event an index could not be mapped to a host. After the move, this was changed to an
+ // AmbariException for consistency in the StageUtils class. To keep this method consistent
+ // with how it behaved in the past, if an AmbariException is thrown, it is caught and
+ // translated to a SystemException.
+ throw new SystemException(e.getMessage(), e);
+ }
+ osFamily = clusters.getHost(hostName).getOsFamily();
+
+ TreeMap<String, String> hostLevelParams = new TreeMap<String, String>();
+ hostLevelParams.put(JDK_LOCATION, managementController.getJdkResourceUrl());
+ hostLevelParams.put(JAVA_HOME, managementController.getJavaHome());
+ hostLevelParams.put(JAVA_VERSION, String.valueOf(configs.getJavaVersion()));
+ hostLevelParams.put(JDK_NAME, managementController.getJDKName());
+ hostLevelParams.put(JCE_NAME, managementController.getJCEName());
+ hostLevelParams.put(STACK_NAME, stackId.getStackName());
+ hostLevelParams.put(STACK_VERSION, stackId.getStackVersion());
+ hostLevelParams.put(DB_NAME, managementController.getServerDB());
+ hostLevelParams.put(MYSQL_JDBC_URL, managementController.getMysqljdbcUrl());
+ hostLevelParams.put(ORACLE_JDBC_URL, managementController.getOjdbcUrl());
+ hostLevelParams.put(HOST_SYS_PREPPED, configs.areHostsSysPrepped());
+ hostLevelParams.putAll(managementController.getRcaParameters());
+ hostLevelParams.put(AGENT_STACK_RETRY_ON_UNAVAILABILITY, configs.isAgentStackRetryOnInstallEnabled());
+ hostLevelParams.put(AGENT_STACK_RETRY_COUNT, configs.getAgentStackRetryOnInstallCount());
+
+ // Write down os specific info for the service
+ ServiceOsSpecific anyOs = null;
+ if (serviceInfo.getOsSpecifics().containsKey(AmbariMetaInfo.ANY_OS)) {
+ anyOs = serviceInfo.getOsSpecifics().get(AmbariMetaInfo.ANY_OS);
+ }
- ServiceOsSpecific hostOs = populateServicePackagesInfo(serviceInfo, hostLevelParams, osFamily);
+ ServiceOsSpecific hostOs = populateServicePackagesInfo(serviceInfo, hostLevelParams, osFamily);
- // Build package list that is relevant for host
- List<ServiceOsSpecific.Package> packages =
- new ArrayList<ServiceOsSpecific.Package>();
- if (anyOs != null) {
- packages.addAll(anyOs.getPackages());
- }
+ // Build package list that is relevant for host
+ List<ServiceOsSpecific.Package> packages =
+ new ArrayList<ServiceOsSpecific.Package>();
+ if (anyOs != null) {
+ packages.addAll(anyOs.getPackages());
+ }
- if (hostOs != null) {
- packages.addAll(hostOs.getPackages());
- }
- String packageList = gson.toJson(packages);
- hostLevelParams.put(PACKAGE_LIST, packageList);
-
- Set<String> userSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.USER, cluster, desiredClusterConfigs);
- String userList = gson.toJson(userSet);
- hostLevelParams.put(USER_LIST, userList);
-
- Set<String> groupSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.GROUP, cluster, desiredClusterConfigs);
- String groupList = gson.toJson(groupSet);
- hostLevelParams.put(GROUP_LIST, groupList);
-
- Set<String> notManagedHdfsPathSet = configHelper.getPropertyValuesWithPropertyType(stackId,PropertyType.NOT_MANAGED_HDFS_PATH, cluster, desiredClusterConfigs);
- String notManagedHdfsPathList = gson.toJson(notManagedHdfsPathSet);
- hostLevelParams.put(NOT_MANAGED_HDFS_PATH_LIST, notManagedHdfsPathList);
-
- String jsonConfigurations = null;
- Map<String, Object> commandParams = new HashMap<String, Object>();
- List<Map<String, String>> xmlConfigs = new LinkedList<Map<String, String>>();
- List<Map<String, String>> envConfigs = new LinkedList<Map<String, String>>();
- List<Map<String, String>> propertiesConfigs = new LinkedList<Map<String, String>>();
-
- //Fill file-dictionary configs from metainfo
- for (ClientConfigFileDefinition clientConfigFile : clientConfigFiles) {
- Map<String, String> fileDict = new HashMap<String, String>();
- fileDict.put(clientConfigFile.getFileName(), clientConfigFile.getDictionaryName());
- if (clientConfigFile.getType().equals("xml")) {
- xmlConfigs.add(fileDict);
- } else if (clientConfigFile.getType().equals("env")) {
- envConfigs.add(fileDict);
- } else if (clientConfigFile.getType().equals("properties")) {
- propertiesConfigs.add(fileDict);
+ if (hostOs != null) {
+ packages.addAll(hostOs.getPackages());
+ }
+ String packageList = gson.toJson(packages);
+ hostLevelParams.put(PACKAGE_LIST, packageList);
+
+ Set<String> userSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.USER, cluster, desiredClusterConfigs);
+ String userList = gson.toJson(userSet);
+ hostLevelParams.put(USER_LIST, userList);
+
+ Set<String> groupSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.GROUP, cluster, desiredClusterConfigs);
+ String groupList = gson.toJson(groupSet);
+ hostLevelParams.put(GROUP_LIST, groupList);
+
+ Set<String> notManagedHdfsPathSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.NOT_MANAGED_HDFS_PATH, cluster, desiredClusterConfigs);
+ String notManagedHdfsPathList = gson.toJson(notManagedHdfsPathSet);
+ hostLevelParams.put(NOT_MANAGED_HDFS_PATH_LIST, notManagedHdfsPathList);
+
+ String jsonConfigurations = null;
+ Map<String, Object> commandParams = new HashMap<String, Object>();
+ List<Map<String, String>> xmlConfigs = new LinkedList<Map<String, String>>();
+ List<Map<String, String>> envConfigs = new LinkedList<Map<String, String>>();
+ List<Map<String, String>> propertiesConfigs = new LinkedList<Map<String, String>>();
+
+ //Fill file-dictionary configs from metainfo
+ for (ClientConfigFileDefinition clientConfigFile : clientConfigFiles) {
+ Map<String, String> fileDict = new HashMap<String, String>();
+ fileDict.put(clientConfigFile.getFileName(), clientConfigFile.getDictionaryName());
+ if (clientConfigFile.getType().equals("xml")) {
+ xmlConfigs.add(fileDict);
+ } else if (clientConfigFile.getType().equals("env")) {
+ envConfigs.add(fileDict);
+ } else if (clientConfigFile.getType().equals("properties")) {
+ propertiesConfigs.add(fileDict);
+ }
}
- }
- commandParams.put("xml_configs_list", xmlConfigs);
- commandParams.put("env_configs_list", envConfigs);
- commandParams.put("properties_configs_list", propertiesConfigs);
- commandParams.put("output_file", componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION);
-
- Map<String, Object> jsonContent = new TreeMap<String, Object>();
- jsonContent.put("configurations", configurations);
- jsonContent.put("configuration_attributes", configurationAttributes);
- jsonContent.put("commandParams", commandParams);
- jsonContent.put("clusterHostInfo", clusterHostInfo);
- jsonContent.put("hostLevelParams", hostLevelParams);
- jsonContent.put("hostname", hostName);
- jsonContent.put("clusterName", cluster.getClusterName());
- jsonConfigurations = gson.toJson(jsonContent);
-
- File jsonFileName = new File(TMP_PATH + File.separator + componentName + "-configuration.json");
- File tmpDirectory = new File(jsonFileName.getParent());
- if (!tmpDirectory.exists()) {
+ commandParams.put("xml_configs_list", xmlConfigs);
+ commandParams.put("env_configs_list", envConfigs);
+ commandParams.put("properties_configs_list", propertiesConfigs);
+ commandParams.put("output_file", componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION);
+
+ Map<String, Object> jsonContent = new TreeMap<String, Object>();
+ jsonContent.put("configurations", configurations);
+ jsonContent.put("configuration_attributes", configurationAttributes);
+ jsonContent.put("commandParams", commandParams);
+ jsonContent.put("clusterHostInfo", clusterHostInfo);
+ jsonContent.put("hostLevelParams", hostLevelParams);
+ jsonContent.put("hostname", hostName);
+ jsonContent.put("clusterName", cluster.getClusterName());
+ jsonConfigurations = gson.toJson(jsonContent);
+
+ File jsonFileName = new File(TMP_PATH + File.separator + componentName + "-configuration.json");
+ File tmpDirectory = new File(jsonFileName.getParent());
+ if (!tmpDirectory.exists()) {
+ try {
+ tmpDirectory.mkdirs();
+ tmpDirectory.setWritable(true, true);
+ tmpDirectory.setReadable(true, true);
+ } catch (SecurityException se) {
+ throw new SystemException("Failed to get temporary directory to store configurations", se);
+ }
+ }
+ PrintWriter printWriter = null;
try {
- tmpDirectory.mkdirs();
- tmpDirectory.setWritable(true, true);
- tmpDirectory.setReadable(true, true);
- } catch (SecurityException se) {
- throw new SystemException("Failed to get temporary directory to store configurations", se);
+ printWriter = new PrintWriter(jsonFileName.getAbsolutePath());
+ printWriter.print(jsonConfigurations);
+ printWriter.close();
+ } catch (FileNotFoundException e) {
+ throw new SystemException("Failed to write configurations to json file ", e);
}
- }
- PrintWriter printWriter = null;
- try {
- printWriter = new PrintWriter(jsonFileName.getAbsolutePath());
- printWriter.print(jsonConfigurations);
- printWriter.close();
- } catch (FileNotFoundException e) {
- throw new SystemException("Failed to write configurations to json file ", e);
- }
- String cmd = pythonCmd + " " + commandScriptAbsolute + " generate_configs " + jsonFileName.getAbsolutePath() + " " +
- packageFolderAbsolute + " " + TMP_PATH + File.separator + "structured-out.json" + " INFO " + TMP_PATH;
+ String cmd = pythonCmd + " " + commandScriptAbsolute + " generate_configs " + jsonFileName.getAbsolutePath() + " " +
+ packageFolderAbsolute + " " + TMP_PATH + File.separator + "structured-out.json" + " INFO " + TMP_PATH;
- try {
- executeCommand(cmd, configs.getExternalScriptTimeout());
- } catch (TimeoutException e) {
- LOG.error("Generate client configs script was killed due to timeout ", e);
- throw new SystemException("Generate client configs script was killed due to timeout ", e);
- } catch (InterruptedException | IOException e) {
- LOG.error("Failed to run generate client configs script for a component " + componentName, e);
- throw new SystemException("Failed to run generate client configs script for a component " + componentName, e);
- } catch (ExecutionException e) {
- LOG.error(e.getMessage(),e);
- throw new SystemException(e.getMessage() + " " + e.getCause());
+ pythonCompressFilesCmds.add(cmd);
+
+ } catch (AmbariException e) {
+ throw new SystemException("Controller error ", e);
}
+ }
+
+ if (schWithConfigFiles.isEmpty()) {
+ throw new SystemException("No configuration files defined for any component" );
+ }
- } catch (AmbariException e) {
- throw new SystemException("Controller error ", e);
+ Integer threadPoolSize = configs.getExternalScriptThreadPoolSize();
+ ExecutorService processExecutor = Executors.newFixedThreadPool(threadPoolSize);
+
+ // put all threads that starts process to compress each component config files in the executor
+ List<CommandLineThreadWrapper> pythonCmdThreads = executeCommands(processExecutor, pythonCompressFilesCmds);
+
+ // wait for all threads to finish
+ Integer timeout = configs.getExternalScriptTimeout();
+ waitForAllThreadsToJoin(processExecutor, pythonCmdThreads, timeout);
+
+ if (StringUtils.isEmpty(requestComponentName)) {
+ TarUtils tarUtils;
+ String fileName;
+ List <ServiceComponentHostResponse> schToTarConfigFiles = schWithConfigFiles;
+ if (StringUtils.isNotEmpty(requestHostName)) {
+ fileName = requestHostName + "(" + Resource.InternalType.Host.toString().toUpperCase()+")";
+ } else if (StringUtils.isNotEmpty(requestServiceName)) {
+ fileName = requestServiceName + "(" + Resource.InternalType.Service.toString().toUpperCase()+")";
+ schToTarConfigFiles = serviceToComponentMap.get(requestServiceName);
+ } else {
+ fileName = schRequest.getClusterName() + "(" + Resource.InternalType.Cluster.toString().toUpperCase()+")";
+ }
+ tarUtils = new TarUtils(TMP_PATH, fileName, schToTarConfigFiles);
+ tarUtils.tarConfigFiles();
}
Resource resource = new ResourceImpl(Resource.Type.ClientConfig);
@@ -428,89 +503,214 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv
return resources;
}
- @Override
- public RequestStatus updateResources(final Request request, Predicate predicate)
- throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException {
-
- throw new SystemException("The request is not supported");
- }
-
- @Override
- public RequestStatus deleteResources(Request request, Predicate predicate)
- throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException {
-
- throw new SystemException("The request is not supported");
- }
-
-
- // ----- AbstractResourceProvider ------------------------------------------
-
- @Override
- protected Set<String> getPKPropertyIds() {
- return pkPropertyIds;
- }
-
-
- // ----- utility methods ---------------------------------------------------
-
/**
- * Get a component request object from a map of property values.
- *
- * @param properties the predicate
- * @return the component request object
+ * Execute all external script commands
+ * @param processExecutor {@link ExecutorService} executes the process when threads are available in the pool
+ * @param commandLines List of {String} commands that starts the python process to compress config files
+ * @return {@link CommandLineThreadWrapper}
+ * @throws SystemException
*/
-
- private ServiceComponentHostRequest getRequest(Map<String, Object> properties) {
- return new ServiceComponentHostRequest(
- (String) properties.get(COMPONENT_CLUSTER_NAME_PROPERTY_ID),
- (String) properties.get(COMPONENT_SERVICE_NAME_PROPERTY_ID),
- (String) properties.get(COMPONENT_COMPONENT_NAME_PROPERTY_ID),
- (String) properties.get(HOST_COMPONENT_HOST_NAME_PROPERTY_ID),
- null);
+ private List<CommandLineThreadWrapper> executeCommands(final ExecutorService processExecutor, List<String> commandLines)
+ throws SystemException {
+ List <CommandLineThreadWrapper> commandLineThreadWrappers = new ArrayList<>();
+ try {
+ for (String commandLine : commandLines) {
+ CommandLineThreadWrapper commandLineThreadWrapper = executeCommand(processExecutor,commandLine);
+ commandLineThreadWrappers.add(commandLineThreadWrapper);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to run generate client configs script for components");
+ processExecutor.shutdownNow();
+ throw new SystemException("Failed to run generate client configs script for components");
+ }
+ return commandLineThreadWrappers;
}
- private int executeCommand(final String commandLine,
- final long timeout)
- throws IOException, InterruptedException, TimeoutException, ExecutionException {
+ /**
+ * Execute external script command
+ * @param processExecutor {@link ExecutorService} executes the process when threads are available in the pool
+ * @param commandLine {String} command that starts the python process to compress config files
+ * @return {@link CommandLineThreadWrapper}
+ * @throws IOException
+ */
+ private CommandLineThreadWrapper executeCommand(final ExecutorService processExecutor, final String commandLine)
+ throws IOException {
ProcessBuilder builder = new ProcessBuilder(Arrays.asList(commandLine.split("\\s+")));
builder.redirectErrorStream(true);
Process process = builder.start();
CommandLineThread commandLineThread = new CommandLineThread(process);
LogStreamReader logStream = new LogStreamReader(process.getInputStream());
Thread logStreamThread = new Thread(logStream, "LogStreamReader");
- logStreamThread.start();
- commandLineThread.start();
+ // log collecting thread should be always put first in the executor
+ processExecutor.execute(logStreamThread);
+ processExecutor.execute(commandLineThread);
+ return new CommandLineThreadWrapper(commandLine, commandLineThread,
+ logStreamThread, logStream, process);
+ }
+
+
+ /**
+ * Waits for all threads to join that have started python process to tar config files for component
+ * @param processExecutor {@link ExecutorService} executes the process when threads are available in the pool
+ * @param pythonCmdThreads list of {@link CommandLineThreadWrapper}
+ * @param timeout {Integer} time to wait for the threads to join
+ * @throws SystemException
+ */
+ private void waitForAllThreadsToJoin(ExecutorService processExecutor, List <CommandLineThreadWrapper> pythonCmdThreads, Integer timeout)
+ throws SystemException {
+ processExecutor.shutdown();
try {
- commandLineThread.join(timeout);
- logStreamThread.join(timeout);
- Integer returnCode = commandLineThread.getReturnCode();
- if (returnCode == null) {
- throw new TimeoutException();
- } else if (returnCode != 0) {
- throw new ExecutionException(String.format("Execution of \"%s\" returned %d.", commandLine, returnCode),
- new Throwable(logStream.getOutput()));
- } else {
- return commandLineThread.returnCode;
+ if (!processExecutor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
+ processExecutor.shutdownNow();
+ for (CommandLineThreadWrapper commandLineThreadWrapper: pythonCmdThreads) {
+ CommandLineThread commandLineThread = commandLineThreadWrapper.getCommandLineThread();
+ try {
+ Integer returnCode = commandLineThread.getReturnCode();
+ if (returnCode == null) {
+ throw new TimeoutException();
+ } else if (returnCode != 0) {
+ throw new ExecutionException(String.format("Execution of \"%s\" returned %d.", commandLineThreadWrapper.getCommandLine(), returnCode),
+ new Throwable(commandLineThreadWrapper.getLogStream().getOutput()));
+ }
+ } catch (TimeoutException e) {
+ LOG.error("Generate client configs script was killed due to timeout ", e);
+ throw new SystemException("Generate client configs script was killed due to timeout ", e);
+ } catch (ExecutionException e) {
+ LOG.error(e.getMessage(), e);
+ throw new SystemException(e.getMessage() + " " + e.getCause());
+ } finally {
+ commandLineThreadWrapper.getProcess().destroy();
+ }
+ }
}
- } catch (InterruptedException ex) {
- commandLineThread.interrupt();
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw ex;
- } finally {
- process.destroy();
+ processExecutor.shutdownNow();
+ LOG.error("Failed to run generate client configs script for components");
+ throw new SystemException("Failed to run generate client configs script for components");
}
}
+
+ /**
+ * wrapper class that holds all information and references to the thread and python process
+ * started to create compressed configuration config files
+ */
+ private static class CommandLineThreadWrapper {
+
+ private String commandLine;
+
+ private CommandLineThread commandLineThread;
+
+ private Thread logStreamThread;
+
+ private LogStreamReader logStream;
+
+ private Process process;
+
+ private CommandLineThreadWrapper(String commandLine, CommandLineThread commandLineThread,
+ Thread logStreamThread, LogStreamReader logStream, Process process) {
+ this.commandLine = commandLine;
+ this.commandLineThread = commandLineThread;
+ this.logStreamThread = logStreamThread;
+ this.logStream = logStream;
+ this.process = process;
+ }
+
+ /**
+ * Returns commandLine that starts pyton process
+ * @return {@link #commandLine}
+ */
+ private String getCommandLine() {
+ return commandLine;
+ }
+
+ /**
+ * Sets {@link #commandLine}
+ * @param commandLine {String}
+ */
+ private void setCommandLine(String commandLine) {
+ this.commandLine = commandLine;
+ }
+
+ /**
+ * Returns thread that starts and waits for python process to complete
+ * @return {@link #commandLineThread}
+ */
+ private CommandLineThread getCommandLineThread() {
+ return commandLineThread;
+ }
+
+ /**
+ * Sets {@link #commandLineThread}
+ * @param commandLineThread {@link CommandLineThread}
+ */
+ private void setCommandLineThread(CommandLineThread commandLineThread) {
+ this.commandLineThread = commandLineThread;
+ }
+
+ /**
+ * Returns thread that starts and waits to get the output and error log stream from the python process
+ * @return {@link #logStreamThread}
+ */
+ private Thread getLogStreamThread() {
+ return logStreamThread;
+ }
+
+ /**
+ * Sets {@link #logStreamThread}
+ * @param logStreamThread {@link Thread}
+ */
+ private void setLogStreamThread(Thread logStreamThread) {
+ this.logStreamThread = logStreamThread;
+ }
+
+ /**
+ * Returns log stream from the python subprocess
+ * @return {@link #logStream}
+ */
+ private LogStreamReader getLogStream() {
+ return logStream;
+ }
+
+ /**
+ * Sets {@link #logStream}
+ * @param logStream {@link LogStreamReader}
+ */
+ private void setLogStream(LogStreamReader logStream) {
+ this.logStream = logStream;
+ }
+
+ /**
+ * Returns python process
+ * @return {@link #process}
+ */
+ private Process getProcess() {
+ return process;
+ }
+
+ /**
+ * Sets {@link #process}
+ * @param process {@link Process}
+ */
+ private void setProcess(Process process) {
+ this.process = process;
+ }
+ }
+
+ /**
+ * Class to run python process to compress config files as seperate thread
+ */
private static class CommandLineThread extends Thread {
private final Process process;
private Integer returnCode;
- public void setReturnCode(Integer exit) {
+ private void setReturnCode(Integer exit) {
returnCode = exit;
}
- public Integer getReturnCode() {
+ private Integer getReturnCode() {
return returnCode;
}
@@ -530,6 +730,9 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv
}
+ /**
+ * Class to collect output and error stream of python subprocess
+ */
private class LogStreamReader implements Runnable {
private BufferedReader reader;
@@ -560,6 +763,131 @@ public class ClientConfigResourceProvider extends AbstractControllerResourceProv
}
}
+
+ /**
+ * This is the utility class to do further compression related operations
+ * on already compressed component configuration files
+ */
+ protected static class TarUtils {
+
+ /**
+ * temporary dir where tar files are saved on ambari server
+ */
+ private String tmpDir;
+
+ /**
+ * name of the compressed file that should be created
+ */
+ private String fileName;
+
+
+ private List<ServiceComponentHostResponse> serviceComponentHostResponses;
+
+ /**
+ * Constructor sets all the fields of the class
+ * @param tmpDir {String}
+ * @param fileName {String}
+ * @param serviceComponentHostResponses {List}
+ */
+ TarUtils(String tmpDir, String fileName, List<ServiceComponentHostResponse> serviceComponentHostResponses) {
+ this.tmpDir = tmpDir;
+ this.fileName = fileName;
+ this.serviceComponentHostResponses = serviceComponentHostResponses;
+ }
+
+ /**
+ * creates single compressed file from the list of existing compressed file
+ * @throws SystemException
+ */
+ protected void tarConfigFiles()
+ throws SystemException {
+
+ try {
+ File compressedOutputFile = new File(tmpDir, fileName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION);
+ FileOutputStream fOut = new FileOutputStream(compressedOutputFile);
+ BufferedOutputStream bOut = new BufferedOutputStream(fOut);
+ GzipCompressorOutputStream gzOut = new GzipCompressorOutputStream(bOut);
+ TarArchiveOutputStream tOut = new TarArchiveOutputStream(gzOut);
+
+ try {
+ for (ServiceComponentHostResponse schResponse : serviceComponentHostResponses) {
+ String componentName = schResponse.getComponentName();
+ File compressedInputFile = new File(tmpDir, componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION);
+ FileInputStream fin = new FileInputStream(compressedInputFile);
+ BufferedInputStream bIn = new BufferedInputStream(fin);
+ GzipCompressorInputStream gzIn = new GzipCompressorInputStream(bIn);
+ TarArchiveInputStream tarIn = new TarArchiveInputStream(gzIn);
+ TarArchiveEntry entry = null;
+ try {
+ while ((entry = tarIn.getNextTarEntry()) != null) {
+ entry.setName(componentName + File.separator + entry.getName());
+ tOut.putArchiveEntry(entry);
+ if (entry.isFile()) {
+ IOUtils.copy(tarIn, tOut);
+ }
+ tOut.closeArchiveEntry();
+ }
+ } catch (Exception e) {
+ throw new SystemException(e.getMessage(), e);
+ } finally {
+ tarIn.close();
+ gzIn.close();
+ bIn.close();
+ fin.close();
+ }
+ }
+ } finally {
+ tOut.finish();
+ tOut.close();
+ }
+ } catch (Exception e) {
+ throw new SystemException(e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public RequestStatus updateResources(final Request request, Predicate predicate)
+ throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException {
+
+ throw new SystemException("The request is not supported");
+ }
+
+ @Override
+ public RequestStatus deleteResources(Request request, Predicate predicate)
+ throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException {
+
+ throw new SystemException("The request is not supported");
+ }
+
+
+ // ----- AbstractResourceProvider ------------------------------------------
+
+ @Override
+ protected Set<String> getPKPropertyIds() {
+ return pkPropertyIds;
+ }
+
+
+ // ----- utility methods ---------------------------------------------------
+
+ /**
+ * Get a component request object from a map of property values.
+ *
+ * @param properties the predicate
+ * @return the component request object
+ */
+
+ private ServiceComponentHostRequest getRequest(Map<String, Object> properties) {
+ return new ServiceComponentHostRequest(
+ (String) properties.get(COMPONENT_CLUSTER_NAME_PROPERTY_ID),
+ (String) properties.get(COMPONENT_SERVICE_NAME_PROPERTY_ID),
+ (String) properties.get(COMPONENT_COMPONENT_NAME_PROPERTY_ID),
+ (String) properties.get(HOST_COMPONENT_HOST_NAME_PROPERTY_ID),
+ null);
+ }
+
+
protected ServiceOsSpecific populateServicePackagesInfo(ServiceInfo serviceInfo, Map<String, String> hostParams,
String osFamily) {
ServiceOsSpecific hostOs = new ServiceOsSpecific(osFamily);
http://git-wip-us.apache.org/repos/asf/ambari/blob/6db03934/ambari-server/src/test/java/org/apache/ambari/server/api/services/ComponentServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/ComponentServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/ComponentServiceTest.java
index cb3c2a7..c658165 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/api/services/ComponentServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/ComponentServiceTest.java
@@ -49,8 +49,8 @@ public class ComponentServiceTest extends BaseServiceTest {
//getComponents
service = new TestComponentService("clusterName", "serviceName", null);
- m = service.getClass().getMethod("getComponents", String.class, HttpHeaders.class, UriInfo.class);
- args = new Object[] {null, getHttpHeaders(), getUriInfo()};
+ m = service.getClass().getMethod("getComponents", String.class, HttpHeaders.class, UriInfo.class, String.class);
+ args = new Object[] {null, getHttpHeaders(), getUriInfo(), null};
listInvocations.add(new ServiceTestInvocation(Request.Type.GET, service, m, args, null));
//createComponent
http://git-wip-us.apache.org/repos/asf/ambari/blob/6db03934/ambari-server/src/test/java/org/apache/ambari/server/api/services/HostComponentServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/HostComponentServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/HostComponentServiceTest.java
index f67d961..9cbed02 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/api/services/HostComponentServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/HostComponentServiceTest.java
@@ -48,8 +48,8 @@ public class HostComponentServiceTest extends BaseServiceTest {
//getHostComponents
componentService = new TestHostComponentService("clusterName", "serviceName", null);
- m = componentService.getClass().getMethod("getHostComponents", String.class, HttpHeaders.class, UriInfo.class);
- args = new Object[] {null, getHttpHeaders(), getUriInfo()};
+ m = componentService.getClass().getMethod("getHostComponents", String.class, HttpHeaders.class, UriInfo.class, String.class);
+ args = new Object[] {null, getHttpHeaders(), getUriInfo(), null};
listInvocations.add(new ServiceTestInvocation(Request.Type.GET, componentService, m, args, null));
//createHostComponent
http://git-wip-us.apache.org/repos/asf/ambari/blob/6db03934/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProviderTest.java
index cb88ae1..cad6ed4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClientConfigResourceProviderTest.java
@@ -22,9 +22,11 @@ import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertFalse;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
import java.io.ByteArrayInputStream;
import java.io.File;
@@ -76,6 +78,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -213,14 +216,6 @@ public class ClientConfigResourceProviderTest {
PropertyHelper.getKeyPropertyIds(type),
managementController);
- // create the request
- Request request = PropertyHelper.getReadRequest(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID, "c1",
- ClientConfigResourceProvider.COMPONENT_COMPONENT_NAME_PROPERTY_ID,
- ClientConfigResourceProvider.COMPONENT_SERVICE_NAME_PROPERTY_ID);
-
- Predicate predicate = new PredicateBuilder().property(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID).equals("c1").
- toPredicate();
-
String clusterName = "C1";
String serviceName = "PIG";
String componentName = "PIG";
@@ -275,11 +270,12 @@ public class ClientConfigResourceProviderTest {
expect(configuration.areHostsSysPrepped()).andReturn("false");
expect(configuration.isAgentStackRetryOnInstallEnabled()).andReturn("false");
expect(configuration.getAgentStackRetryOnInstallCount()).andReturn("5");
+ expect(configuration.getExternalScriptThreadPoolSize()).andReturn(Configuration.THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT.getDefaultValue());
expect(configuration.getExternalScriptTimeout()).andReturn(Configuration.EXTERNAL_SCRIPT_TIMEOUT.getDefaultValue());
Map<String,String> props = new HashMap<String, String>();
props.put("key","value");
expect(clusterConfig.getProperties()).andReturn(props);
- expect(configHelper.getEffectiveDesiredTags(cluster, hostName)).andReturn(allConfigTags);
+ expect(configHelper.getEffectiveDesiredTags(cluster, null)).andReturn(allConfigTags);
expect(cluster.getClusterName()).andReturn(clusterName);
expect(managementController.getHostComponents(EasyMock.<Set<ServiceComponentHostRequest>>anyObject())).andReturn(responses).anyTimes();
expect(cluster.getCurrentStackVersion()).andReturn(stackId);
@@ -348,6 +344,19 @@ public class ClientConfigResourceProviderTest {
InputStream inputStream = new ByteArrayInputStream("some logging info".getBytes());
expect(process.getInputStream()).andReturn(inputStream);
+ ClientConfigResourceProvider.TarUtils tarUtilMock = PowerMockito.mock(ClientConfigResourceProvider.TarUtils.class);
+ whenNew(ClientConfigResourceProvider.TarUtils.class).withAnyArguments().thenReturn(tarUtilMock);
+ tarUtilMock.tarConfigFiles();
+ expectLastCall().once();
+
+ // create the request
+ Request request = PropertyHelper.getReadRequest(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID, "c1",
+ ClientConfigResourceProvider.COMPONENT_COMPONENT_NAME_PROPERTY_ID,
+ ClientConfigResourceProvider.COMPONENT_SERVICE_NAME_PROPERTY_ID);
+
+ Predicate predicate = new PredicateBuilder().property(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID).
+ equals("c1").and().property(ClientConfigResourceProvider.COMPONENT_SERVICE_NAME_PROPERTY_ID).equals("PIG").toPredicate();
+
// replay
replay(managementController, clusters, cluster, ambariMetaInfo, stackId, componentInfo, commandScriptDefinition,
clusterConfig, host, service, serviceComponent, serviceComponentHost, serviceInfo, configHelper,
@@ -419,8 +428,10 @@ public class ClientConfigResourceProviderTest {
ClientConfigResourceProvider.COMPONENT_COMPONENT_NAME_PROPERTY_ID,
ClientConfigResourceProvider.COMPONENT_SERVICE_NAME_PROPERTY_ID);
- Predicate predicate = new PredicateBuilder().property(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID).equals("c1").
- toPredicate();
+ Predicate predicate = new PredicateBuilder().property(ClientConfigResourceProvider.COMPONENT_CLUSTER_NAME_PROPERTY_ID).
+ equals("c1").and().property(ClientConfigResourceProvider.COMPONENT_COMPONENT_NAME_PROPERTY_ID).equals("PIG").
+ and().property(ClientConfigResourceProvider.COMPONENT_SERVICE_NAME_PROPERTY_ID).equals("PIG").
+ toPredicate();
String clusterName = "C1";
String serviceName = "PIG";
@@ -477,12 +488,13 @@ public class ClientConfigResourceProviderTest {
expect(configuration.areHostsSysPrepped()).andReturn("false");
expect(configuration.isAgentStackRetryOnInstallEnabled()).andReturn("false");
expect(configuration.getAgentStackRetryOnInstallCount()).andReturn("5");
+ expect(configuration.getExternalScriptThreadPoolSize()).andReturn(Configuration.THREAD_POOL_SIZE_FOR_EXTERNAL_SCRIPT.getDefaultValue());
expect(configuration.getExternalScriptTimeout()).andReturn(Configuration.EXTERNAL_SCRIPT_TIMEOUT.getDefaultValue());
Map<String,String> props = new HashMap<String, String>();
props.put("key","value");
expect(clusterConfig.getProperties()).andReturn(props);
- expect(configHelper.getEffectiveDesiredTags(cluster, hostName)).andReturn(allConfigTags);
+ expect(configHelper.getEffectiveDesiredTags(cluster, null)).andReturn(allConfigTags);
expect(cluster.getClusterName()).andReturn(clusterName);
expect(managementController.getHostComponents(EasyMock.<Set<ServiceComponentHostRequest>>anyObject())).andReturn(responses).anyTimes();
expect(cluster.getCurrentStackVersion()).andReturn(stackId);