You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by js...@apache.org on 2015/05/11 23:38:14 UTC

ambari git commit: AMBARI-11038. Fix timing issue regarding setting of topology resolved configuration for clusters provisioned via blueprints

Repository: ambari
Updated Branches:
  refs/heads/trunk d05c9c287 -> 66a4bfb26


AMBARI-11038. Fix timing issue regarding setting of topology resolved configuration
for clusters provisioned via blueprints


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/66a4bfb2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/66a4bfb2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/66a4bfb2

Branch: refs/heads/trunk
Commit: 66a4bfb26ad5ffffa59d925e56f465bec3346696
Parents: d05c9c2
Author: John Speidel <js...@hortonworks.com>
Authored: Mon May 11 15:38:25 2015 -0400
Committer: John Speidel <js...@hortonworks.com>
Committed: Mon May 11 17:37:03 2015 -0400

----------------------------------------------------------------------
 .../topology/ClusterConfigurationRequest.java   | 14 ++--
 .../ambari/server/topology/LogicalRequest.java  | 14 ----
 .../ambari/server/topology/TopologyManager.java | 67 ++++++++++----------
 .../server/topology/TopologyManagerTest.java    | 27 +++-----
 4 files changed, 50 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
index a8c2ff3..eb583fd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
@@ -84,7 +84,7 @@ public class ClusterConfigurationRequest {
    */
   public void setConfigurationsOnCluster(ClusterTopology clusterTopology, String tag)  {
     //todo: also handle setting of host group scoped configuration which is updated by config processor
-    List<BlueprintServiceConfigRequest> listofConfigRequests = new LinkedList<BlueprintServiceConfigRequest>();
+    List<BlueprintServiceConfigRequest> configurationRequests = new LinkedList<BlueprintServiceConfigRequest>();
 
     Blueprint blueprint = clusterTopology.getBlueprint();
     Configuration clusterConfiguration = clusterTopology.getConfiguration();
@@ -108,7 +108,7 @@ public class ClusterConfigurationRequest {
         }
       }
 
-      listofConfigRequests.add(blueprintConfigRequest);
+      configurationRequests.add(blueprintConfigRequest);
     }
 
     // since the stack returns "cluster-env" with each service's config ensure that only one
@@ -118,9 +118,9 @@ public class ClusterConfigurationRequest {
     Map<String, Map<String, String>> clusterEnvAttributes = clusterConfiguration.getFullAttributes().get("cluster-env");
 
     globalConfigRequest.addConfigElement("cluster-env", clusterEnvProps,clusterEnvAttributes);
-    listofConfigRequests.add(globalConfigRequest);
+    configurationRequests.add(globalConfigRequest);
 
-    setConfigurationsOnCluster(listofConfigRequests, tag);
+    setConfigurationsOnCluster(configurationRequests, tag);
   }
 
   /**
@@ -131,12 +131,12 @@ public class ClusterConfigurationRequest {
    *
    * This method will also send these requests to the management controller.
    *
-   * @param listOfBlueprintConfigRequests a list of requests to send to the AmbariManagementController.
+   * @param configurationRequests a list of requests to send to the AmbariManagementController.
    */
-  private void setConfigurationsOnCluster(List<BlueprintServiceConfigRequest> listOfBlueprintConfigRequests,
+  private void setConfigurationsOnCluster(List<BlueprintServiceConfigRequest> configurationRequests,
                                           String tag)  {
     // iterate over services to deploy
-    for (BlueprintServiceConfigRequest blueprintConfigRequest : listOfBlueprintConfigRequests) {
+    for (BlueprintServiceConfigRequest blueprintConfigRequest : configurationRequests) {
       ClusterRequest clusterRequest = null;
       // iterate over the config types associated with this service
       List<ConfigurationRequest> requestsPerService = new LinkedList<ConfigurationRequest>();

http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
index 087ad4c..88c791b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
@@ -158,20 +158,6 @@ public class LogicalRequest extends Request {
     return new ArrayList<HostRequest>(allHostRequests);
   }
 
-  //todo: account for blueprint name?
-  //todo: this should probably be done implicitly at a lower level
-  public boolean areGroupsResolved(Collection<String> hostGroupNames) {
-    synchronized (outstandingHostRequests) {
-      // iterate over outstanding host requests
-      for (HostRequest request : outstandingHostRequests) {
-        if (hostGroupNames.contains(request.getHostgroupName()) && request.getHostName() == null) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
   public Map<String, Collection<String>> getProjectedTopology() {
     Map<String, Collection<String>> hostComponentMap = new HashMap<String, Collection<String>>();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 864655d..e6c2f1e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -53,7 +53,6 @@ public class TopologyManager {
   public static final String TOPOLOGY_RESOLVED_TAG = "TOPOLOGY_RESOLVED";
 
   private PersistedState persistedState;
-  //private ExecutorService executor = getExecutorService();
   private ExecutorService executor = Executors.newSingleThreadExecutor();
   private Collection<String> hostsToIgnore = new HashSet<String>();
   private final List<HostImpl> availableHosts = new LinkedList<HostImpl>();
@@ -83,8 +82,8 @@ public class TopologyManager {
   private void ensureInitialized() {
     synchronized(initializationLock) {
       if (! isInitialized) {
-        isInitialized = true;
         replayRequests(persistedState.getAllRequests());
+        isInitialized = true;
       }
     }
   }
@@ -99,7 +98,7 @@ public class TopologyManager {
     String clusterName = topology.getClusterName();
     clusterTopologyMap.put(clusterName, topology);
 
-    addClusterConfigRequest(new ClusterConfigurationRequest(ambariContext, topology, true));
+    addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, true));
     LogicalRequest logicalRequest = processRequest(persistedRequest, topology);
 
     //todo: this should be invoked as part of a generic lifecycle event which could possibly
@@ -394,7 +393,7 @@ public class TopologyManager {
       if (! configChecked) {
         configChecked = true;
         if (! ambariContext.doesConfigurationWithTagExist(topology.getClusterName(), TOPOLOGY_RESOLVED_TAG)) {
-          addClusterConfigRequest(new ClusterConfigurationRequest(ambariContext, topology, false));
+          addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, false));
         }
       }
     }
@@ -423,28 +422,26 @@ public class TopologyManager {
   }
 
   /**
-   * Making it a synchronous call as resolution of initial configurations need to happen before
-   * any tasks are created
-   * TODO - needs further review
+   * Register the configuration task which is responsible for configuration topology resolution
+   * and setting the updated configuration on the cluster.  This task needs to be submitted to the
+   * executor before any host requests to ensure that no install or start tasks are executed prior
+   * to configuration being set on the cluster.
+   *
+   * @param topology              cluster topology
+   * @param configurationRequest  configuration request to be executed
    */
-  private void addClusterConfigRequest(ClusterConfigurationRequest configurationRequest) {
-    ConfigureClusterTask cct = new ConfigureClusterTask(configurationRequest);
-    cct.run();
-    //executor.execute(new ConfigureClusterTask(configurationRequest));
-    //try {
-    //  executor.awaitTermination(10, TimeUnit.SECONDS);
-    //  LOG.info("Resolved cluster topology configuration.");
-    //}catch(InterruptedException ex) {
-    //  LOG.warn("Failed to resolve topology configuration");
-    //}
+  private void addClusterConfigRequest(ClusterTopology topology, ClusterConfigurationRequest configurationRequest) {
+    executor.execute(new ConfigureClusterTask(topology, configurationRequest));
   }
 
   private class ConfigureClusterTask implements Runnable {
     private ClusterConfigurationRequest configRequest;
+    private ClusterTopology topology;
 
 
-    public ConfigureClusterTask(ClusterConfigurationRequest configRequest) {
+    public ConfigureClusterTask(ClusterTopology topology, ClusterConfigurationRequest configRequest) {
       this.configRequest = configRequest;
+      this.topology = topology;
     }
 
     @Override
@@ -457,14 +454,13 @@ public class TopologyManager {
       Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();
       while (! completed && ! interrupted) {
         try {
-          Thread.sleep(200);
+          Thread.sleep(100);
         } catch (InterruptedException e) {
           interrupted = true;
           // reset interrupted flag on thread
           Thread.interrupted();
         }
-
-        completed = areConfigsResolved(requiredHostGroups);
+        completed = areRequiredHostGroupsResolved(requiredHostGroups);
       }
 
       if (! interrupted) {
@@ -478,12 +474,15 @@ public class TopologyManager {
               "An exception occurred while attempting to process cluster configs and set on cluster: " + e);
           e.printStackTrace();
         }
-
-        //executePendingTasks();
       }
       LOG.info("TopologyManager.ConfigureClusterTask: Exiting");
     }
 
+    /**
+     * Return the set of host group names which are required for configuration topology resolution.
+     *
+     * @return set of required host group names
+     */
     private Collection<String> getTopologyRequiredHostGroups() {
       Collection<String> requiredHostGroups;
       try {
@@ -497,16 +496,20 @@ public class TopologyManager {
       return requiredHostGroups;
     }
 
-    // get set of required host groups from config processor and confirm that all requests
-    // have fully resolved the host names for the required host groups
-    private boolean areConfigsResolved(Collection<String> requiredHostGroups) {
+    /**
+     * Determine if all hosts for the given set of required host groups are known.
+     *
+     * @param requiredHostGroups set of required host groups
+     * @return true if all required host groups are resolved
+     */
+    private boolean areRequiredHostGroupsResolved(Collection<String> requiredHostGroups) {
       boolean configTopologyResolved = true;
-      synchronized (outstandingRequests) {
-        for (LogicalRequest outstandingRequest : outstandingRequests) {
-          if (! outstandingRequest.areGroupsResolved(requiredHostGroups)) {
-            configTopologyResolved = false;
-            break;
-          }
+      Map<String, HostGroupInfo> hostGroupInfo = topology.getHostGroupInfo();
+      for (String hostGroup : requiredHostGroups) {
+        HostGroupInfo groupInfo = hostGroupInfo.get(hostGroup);
+        if (groupInfo == null || groupInfo.getHostNames().size() < groupInfo.getRequestedHostCount()) {
+          configTopologyResolved = false;
+          break;
         }
       }
       return configTopologyResolved;

http://git-wip-us.apache.org/repos/asf/ambari/blob/66a4bfb2/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
index 53abd1c..df87aec 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
@@ -72,9 +72,7 @@ public class TopologyManagerTest {
   private final ConfigurationRequest configurationRequest = createNiceMock(ConfigurationRequest.class);
   private final ConfigurationRequest configurationRequest2 = createNiceMock(ConfigurationRequest.class);
   private final ConfigurationRequest configurationRequest3 = createNiceMock(ConfigurationRequest.class);
-  private final ConfigurationRequest configurationRequest4 = createNiceMock(ConfigurationRequest.class);
-  private final ConfigurationRequest configurationRequest5 = createNiceMock(ConfigurationRequest.class);
-  private final ConfigurationRequest configurationRequest6 = createNiceMock(ConfigurationRequest.class);
+
   private final RequestStatusResponse requestStatusResponse = createNiceMock(RequestStatusResponse.class);
   private final ExecutorService executor = createStrictMock(ExecutorService.class);
   private final PersistedState persistedState = createStrictMock(PersistedState.class);
@@ -118,9 +116,6 @@ public class TopologyManagerTest {
   private Capture<Map<String, Object>> configRequestPropertiesCapture;
   private Capture<Map<String, Object>> configRequestPropertiesCapture2;
   private Capture<Map<String, Object>> configRequestPropertiesCapture3;
-  private Capture<Map<String, Object>> configRequestPropertiesCapture4;
-  private Capture<Map<String, Object>> configRequestPropertiesCapture5;
-  private Capture<Map<String, Object>> configRequestPropertiesCapture6;
   private Capture<ClusterRequest> updateClusterConfigRequestCapture;
   private Capture<Runnable> updateConfigTaskCapture;
 
@@ -131,9 +126,6 @@ public class TopologyManagerTest {
     configRequestPropertiesCapture = new Capture<Map<String, Object>>();
     configRequestPropertiesCapture2 = new Capture<Map<String, Object>>();
     configRequestPropertiesCapture3 = new Capture<Map<String, Object>>();
-    configRequestPropertiesCapture4 = new Capture<Map<String, Object>>();
-    configRequestPropertiesCapture5 = new Capture<Map<String, Object>>();
-    configRequestPropertiesCapture6 = new Capture<Map<String, Object>>();
     updateClusterConfigRequestCapture = new Capture<ClusterRequest>();
     updateConfigTaskCapture = new Capture<Runnable>();
 
@@ -248,25 +240,23 @@ public class TopologyManagerTest {
     //expectLastCall().once();
     //ambariContext.registerHostWithConfigGroup(eq("host1"), isA(ClusterTopologyImpl.class), eq("group1"));
     //expectLastCall().once();
+
+    // cluster configuration task run() isn't executed by mock executor
+    // so only INITIAL config
     expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture))).
         andReturn(Collections.singletonList(configurationRequest));
     expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture2))).
         andReturn(Collections.singletonList(configurationRequest2)).once();
     expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture3))).
         andReturn(Collections.singletonList(configurationRequest3)).once();
-    expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture4))).
-        andReturn(Collections.singletonList(configurationRequest4)).once();
-    expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture5))).
-        andReturn(Collections.singletonList(configurationRequest5)).once();
-    expect(ambariContext.createConfigurationRequests(capture(configRequestPropertiesCapture6))).
-        andReturn(Collections.singletonList(configurationRequest6)).once();
+
     ambariContext.setConfigurationOnCluster(capture(updateClusterConfigRequestCapture));
-    expectLastCall().times(6);
+    expectLastCall().times(3);
     ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION);
     expectLastCall().once();
 
-    //executor.execute(capture(updateConfigTaskCapture));
-    //expectLastCall().times(0);
+    executor.execute(capture(updateConfigTaskCapture));
+    expectLastCall().times(1);
 
     expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
         List<LogicalRequest>>emptyMap()).once();
@@ -306,7 +296,6 @@ public class TopologyManagerTest {
   @Test
   public void testProvisionCluster() throws Exception {
     topologyManager.provisionCluster(request);
-
     //todo: assertions
   }