You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by js...@apache.org on 2015/05/11 23:38:14 UTC
ambari git commit: AMBARI-11038. Fix timing issue regarding setting
of topology resolved configuration for clusters provisioned via blueprints
Repository: ambari
Updated Branches:
refs/heads/trunk d05c9c287 -> 66a4bfb26
AMBARI-11038. Fix timing issue regarding setting of topology resolved configuration
for clusters provisioned via blueprints
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/66a4bfb2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/66a4bfb2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/66a4bfb2
Branch: refs/heads/trunk
Commit: 66a4bfb26ad5ffffa59d925e56f465bec3346696
Parents: d05c9c2
Author: John Speidel <js...@hortonworks.com>
Authored: Mon May 11 15:38:25 2015 -0400
Committer: John Speidel <js...@hortonworks.com>
Committed: Mon May 11 17:37:03 2015 -0400
----------------------------------------------------------------------
.../topology/ClusterConfigurationRequest.java | 14 ++--
.../ambari/server/topology/LogicalRequest.java | 14 ----
.../ambari/server/topology/TopologyManager.java | 67 ++++++++++----------
.../server/topology/TopologyManagerTest.java | 27 +++-----
4 files changed, 50 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
index a8c2ff3..eb583fd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
@@ -84,7 +84,7 @@ public class ClusterConfigurationRequest {
*/
public void setConfigurationsOnCluster(ClusterTopology clusterTopology, String tag) {
//todo: also handle setting of host group scoped configuration which is updated by config processor
- List<BlueprintServiceConfigRequest> listofConfigRequests = new LinkedList<BlueprintServiceConfigRequest>();
+ List<BlueprintServiceConfigRequest> configurationRequests = new LinkedList<BlueprintServiceConfigRequest>();
Blueprint blueprint = clusterTopology.getBlueprint();
Configuration clusterConfiguration = clusterTopology.getConfiguration();
@@ -108,7 +108,7 @@ public class ClusterConfigurationRequest {
}
}
- listofConfigRequests.add(blueprintConfigRequest);
+ configurationRequests.add(blueprintConfigRequest);
}
// since the stack returns "cluster-env" with each service's config ensure that only one
@@ -118,9 +118,9 @@ public class ClusterConfigurationRequest {
Map<String, Map<String, String>> clusterEnvAttributes = clusterConfiguration.getFullAttributes().get("cluster-env");
globalConfigRequest.addConfigElement("cluster-env", clusterEnvProps,clusterEnvAttributes);
- listofConfigRequests.add(globalConfigRequest);
+ configurationRequests.add(globalConfigRequest);
- setConfigurationsOnCluster(listofConfigRequests, tag);
+ setConfigurationsOnCluster(configurationRequests, tag);
}
/**
@@ -131,12 +131,12 @@ public class ClusterConfigurationRequest {
*
* This method will also send these requests to the management controller.
*
- * @param listOfBlueprintConfigRequests a list of requests to send to the AmbariManagementController.
+ * @param configurationRequests a list of requests to send to the AmbariManagementController.
*/
- private void setConfigurationsOnCluster(List<BlueprintServiceConfigRequest> listOfBlueprintConfigRequests,
+ private void setConfigurationsOnCluster(List<BlueprintServiceConfigRequest> configurationRequests,
String tag) {
// iterate over services to deploy
- for (BlueprintServiceConfigRequest blueprintConfigRequest : listOfBlueprintConfigRequests) {
+ for (BlueprintServiceConfigRequest blueprintConfigRequest : configurationRequests) {
ClusterRequest clusterRequest = null;
// iterate over the config types associated with this service
List<ConfigurationRequest> requestsPerService = new LinkedList<ConfigurationRequest>();
http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
index 087ad4c..88c791b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
@@ -158,20 +158,6 @@ public class LogicalRequest extends Request {
return new ArrayList<HostRequest>(allHostRequests);
}
- //todo: account for blueprint name?
- //todo: this should probably be done implicitly at a lower level
- public boolean areGroupsResolved(Collection<String> hostGroupNames) {
- synchronized (outstandingHostRequests) {
- // iterate over outstanding host requests
- for (HostRequest request : outstandingHostRequests) {
- if (hostGroupNames.contains(request.getHostgroupName()) && request.getHostName() == null) {
- return false;
- }
- }
- }
- return true;
- }
-
public Map<String, Collection<String>> getProjectedTopology() {
Map<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>();
http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 864655d..e6c2f1e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -53,7 +53,6 @@ public class TopologyManager {
public static final String TOPOLOGY_RESOLVED_TAG = "TOPOLOGY_RESOLVED";
private PersistedState persistedState;
- //private ExecutorService executor = getExecutorService();
private ExecutorService executor = Executors.newSingleThreadExecutor();
private Collection<String> hostsToIgnore = new HashSet<String>();
private final List<HostImpl> availableHosts = new LinkedList<HostImpl>();
@@ -83,8 +82,8 @@ public class TopologyManager {
private void ensureInitialized() {
synchronized(initializationLock) {
if (! isInitialized) {
- isInitialized = true;
replayRequests(persistedState.getAllRequests());
+ isInitialized = true;
}
}
}
@@ -99,7 +98,7 @@ public class TopologyManager {
String clusterName = topology.getClusterName();
clusterTopologyMap.put(clusterName, topology);
- addClusterConfigRequest(new ClusterConfigurationRequest(ambariContext, topology, true));
+ addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, true));
LogicalRequest logicalRequest = processRequest(persistedRequest, topology);
//todo: this should be invoked as part of a generic lifecycle event which could possibly
@@ -394,7 +393,7 @@ public class TopologyManager {
if (! configChecked) {
configChecked = true;
if (! ambariContext.doesConfigurationWithTagExist(topology.getClusterName(), TOPOLOGY_RESOLVED_TAG)) {
- addClusterConfigRequest(new ClusterConfigurationRequest(ambariContext, topology, false));
+ addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, false));
}
}
}
@@ -423,28 +422,26 @@ public class TopologyManager {
}
/**
- * Making it a synchronous call as resolution of initial configurations need to happen before
- * any tasks are created
- * TODO - needs further review
+ * Register the configuration task which is responsible for configuration topology resolution
+ * and setting the updated configuration on the cluster. This task needs to be submitted to the
+ * executor before any host requests to ensure that no install or start tasks are executed prior
+ * to configuration being set on the cluster.
+ *
+ * @param topology cluster topology
+ * @param configurationRequest configuration request to be executed
*/
- private void addClusterConfigRequest(ClusterConfigurationRequest configurationRequest) {
- ConfigureClusterTask cct = new ConfigureClusterTask(configurationRequest);
- cct.run();
- //executor.execute(new ConfigureClusterTask(configurationRequest));
- //try {
- // executor.awaitTermination(10, TimeUnit.SECONDS);
- // LOG.info("Resolved cluster topology configuration.");
- //}catch(InterruptedException ex) {
- // LOG.warn("Failed to resolve topology configuration");
- //}
+ private void addClusterConfigRequest(ClusterTopology topology, ClusterConfigurationRequest configurationRequest) {
+ executor.execute(new ConfigureClusterTask(topology, configurationRequest));
}
private class ConfigureClusterTask implements Runnable {
private ClusterConfigurationRequest configRequest;
+ private ClusterTopology topology;
- public ConfigureClusterTask(ClusterConfigurationRequest configRequest) {
+ public ConfigureClusterTask(ClusterTopology topology, ClusterConfigurationRequest configRequest) {
this.configRequest = configRequest;
+ this.topology = topology;
}
@Override
@@ -457,14 +454,13 @@ public class TopologyManager {
Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();
while (! completed && ! interrupted) {
try {
- Thread.sleep(200);
+ Thread.sleep(100);
} catch (InterruptedException e) {
interrupted = true;
// reset interrupted flag on thread
Thread.interrupted();
}
-
- completed = areConfigsResolved(requiredHostGroups);
+ completed = areRequiredHostGroupsResolved(requiredHostGroups);
}
if (! interrupted) {
@@ -478,12 +474,15 @@ public class TopologyManager {
"An exception occurred while attempting to process cluster configs and set on cluster: " + e);
e.printStackTrace();
}
-
- //executePendingTasks();
}
LOG.info("TopologyManager.ConfigureClusterTask: Exiting");
}
+ /**
+ * Return the set of host group names which are required for configuration topology resolution.
+ *
+ * @return set of required host group names
+ */
private Collection<String> getTopologyRequiredHostGroups() {
Collection<String> requiredHostGroups;
try {
@@ -497,16 +496,20 @@ public class TopologyManager {
return requiredHostGroups;
}
- // get set of required host groups from config processor and confirm that all requests
- // have fully resolved the host names for the required host groups
- private boolean areConfigsResolved(Collection<String> requiredHostGroups) {
+ /**
+ * Determine if all hosts for the given set of required host groups are known.
+ *
+ * @param requiredHostGroups set of required host groups
+ * @return true if all required host groups are resolved
+ */
+ private boolean areRequiredHostGroupsResolved(Collection<String> requiredHostGroups) {
boolean configTopologyResolved = true;
- synchronized (outstandingRequests) {
- for (LogicalRequest outstandingRequest : outstandingRequests) {
- if (! outstandingRequest.areGroupsResolved(requiredHostGroups)) {
- configTopologyResolved = false;
- break;
- }
+ Map<String, HostGroupInfo> hostGroupInfo = topology.getHostGroupInfo();
+ for (String hostGroup : requiredHostGroups) {
+ HostGroupInfo groupInfo = hostGroupInfo.get(hostGroup);
+ if (groupInfo == null || groupInfo.getHostNames().size() < groupInfo.getRequestedHostCount()) {
+ configTopologyResolved = false;
+ break;
}
}
return configTopologyResolved;
http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
index 53abd1c..df87aec 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
@@ -72,9 +72,7 @@ public class TopologyManagerTest {
private final ConfigurationRequest configurationRequest = createNiceMock(ConfigurationRequest.class);
private final ConfigurationRequest configurationRequest2 = createNiceMock(ConfigurationRequest.class);
private final ConfigurationRequest configurationRequest3 = createNiceMock(ConfigurationRequest.class);
- private final ConfigurationRequest configurationRequest4 = createNiceMock(ConfigurationRequest.class);
- private final ConfigurationRequest configurationRequest5 = createNiceMock(ConfigurationRequest.class);
- private final ConfigurationRequest configurationRequest6 = createNiceMock(ConfigurationRequest.class);
+
private final RequestStatusResponse requestStatusResponse = createNiceMock(RequestStatusResponse.class);
private final ExecutorService executor = createStrictMock(ExecutorService.class);
private final PersistedState persistedState = createStrictMock(PersistedState.class);
@@ -118,9 +116,6 @@ public class TopologyManagerTest {
private Capture<Map<String, Object>> configRequestPropertiesCapture;
private Capture<Map<String, Object>> configRequestPropertiesCapture2;
private Capture<Map<String, Object>> configRequestPropertiesCapture3;
- private Capture<Map<String, Object>> configRequestPropertiesCapture4;
- private Capture<Map<String, Object>> configRequestPropertiesCapture5;
- private Capture<Map<String, Object>> configRequestPropertiesCapture6;
private Capture<ClusterRequest> updateClusterConfigRequestCapture;
private Capture<Runnable> updateConfigTaskCapture;
@@ -131,9 +126,6 @@ public class TopologyManagerTest {
configRequestPropertiesCapture = new Capture<Map<String, Object>>();
configRequestPropertiesCapture2 = new Capture<Map<String, Object>>();
configRequestPropertiesCapture3 = new Capture<Map<String, Object>>();
- configRequestPropertiesCapture4 = new Capture<Map<String, Object>>();
- configRequestPropertiesCapture5 = new Capture<Map<String, Object>>();
- configRequestPropertiesCapture6 = new Capture<Map<String, Object>>();
updateClusterConfigRequestCapture = new Capture<ClusterRequest>();
updateConfigTaskCapture = new Capture<Runnable>();
@@ -248,25 +240,23 @@ public class TopologyManagerTest {
//expectLastCall().once();
//ambariContext.registerHostWithConfigGroup(eq("host1"), isA(ClusterTopologyImpl.class), eq("group1"));
//expectLastCall().once();
+
+ // cluster configuration task run() isn't executed by mock executor
+ // so only INITIAL config
expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture))).
andReturn(Collections.singletonList(configurationRequest));
expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture2))).
andReturn(Collections.singletonList(configurationRequest2)).once();
expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture3))).
andReturn(Collections.singletonList(configurationRequest3)).once();
- expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture4))).
- andReturn(Collections.singletonList(configurationRequest4)).once();
- expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture5))).
- andReturn(Collections.singletonList(configurationRequest5)).once();
- expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture6))).
- andReturn(Collections.singletonList(configurationRequest6)).once();
+
ambariContext.setConfigurationOnCluster(capture(updateClusterConfigRequestCapture));
- expectLastCall().times(6);
+ expectLastCall().times(3);
ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION);
expectLastCall().once();
- //executor.execute(capture(updateConfigTaskCapture));
- //expectLastCall().times(0);
+ executor.execute(capture(updateConfigTaskCapture));
+ expectLastCall().times(1);
expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
List<LogicalRequest>>emptyMap()).once();
@@ -306,7 +296,6 @@ public class TopologyManagerTest {
@Test
public void testProvisionCluster() throws Exception {
topologyManager.provisionCluster(request);
-
//todo: assertions
}