You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2018/11/15 20:47:33 UTC

[ambari] branch trunk updated: [AMBARI-24909] Parallel Client Fixes for Server Side Tasks (#2618)

This is an automated email from the ASF dual-hosted git repository.

ncole pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 60d3134  [AMBARI-24909] Parallel Client Fixes for Server Side Tasks (#2618)
60d3134 is described below

commit 60d313463ce52c8df5c2268491a70c8cd38eb6a8
Author: ncole <nc...@hortonworks.com>
AuthorDate: Thu Nov 15 15:47:28 2018 -0500

    [AMBARI-24909] Parallel Client Fixes for Server Side Tasks (#2618)
---
 .../resource_management/libraries/script/script.py |   2 +-
 .../orchestrate/ParallelClientGroupingBuilder.java | 136 +++++++++++++++------
 2 files changed, 102 insertions(+), 36 deletions(-)

diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py
index 67bfca1..76893fd 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py
@@ -1056,7 +1056,7 @@ class Script(object):
     if self.should_expose_component_version("restart"):
       self.save_component_version_to_structured_out("restart")
 
-  def post_upgrade_restart(env, upgrade_type=None):
+  def post_upgrade_restart(self, env, upgrade_type=None):
     """
     To be overridden by subclasses
     """
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java
index 9411995..0ed9931 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java
@@ -60,6 +60,12 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
       return;
     }
 
+    Task task = resolveTask(upgradeContext, pc);
+    // !!! better not
+    if (null == task) {
+      return;
+    }
+
     Iterator<String> hostIterator = hostsType.getHosts().iterator();
     HostHolder holder = new HostHolder();
     holder.m_firstHost = hostIterator.next();
@@ -68,11 +74,10 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
       holder.m_remainingHosts.add(hostIterator.next());
     }
 
-    holder.m_component = pc;
-    holder.m_tasks = new ArrayList<>();
-    holder.m_tasks.addAll(resolveTasks(upgradeContext, true, pc));
-    holder.m_tasks.add(resolveTask(upgradeContext, pc));
-    holder.m_tasks.addAll(resolveTasks(upgradeContext, false, pc));
+    holder.m_component = pc.name;
+    holder.m_tasks = Collections.singletonList(task);
+    holder.m_preTasks = resolveTasks(upgradeContext, true, pc);
+    holder.m_postTasks = resolveTasks(upgradeContext, false, pc);
 
     serviceToHostMap.put(service, holder);
   }
@@ -91,36 +96,40 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
     // !!! create a stage wrapper for the service check on the first host
     // !!! create a stage wrapper for the remaining hosts
 
-    serviceToHostMap.forEach((service, holder) -> {
-      String component = holder.m_component.name;
+    // !!! when building stages, the pre- and post- tasks should be run sequentially,
+    // which means they should be in their own stages.  that may seem counter-intuitive,
+    // but the parallelism is across hosts, not stages.
 
-      List<TaskWrapper> wrappers = buildWrappers(service, component, holder.m_tasks,
-          Collections.singleton(holder.m_firstHost), true);
+    serviceToHostMap.forEach((service, holder) -> {
 
-      String text = getStageText("Upgrading",
-          upgradeContext.getComponentDisplay(service, component),
-          Collections.singleton(holder.m_firstHost));
+      // !!! pre-tasks for the first host
+      starterUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+          holder.m_preTasks, true, "Preparing"));
 
-      // !!! this is a poor assumption
-      StageWrapper.Type type = wrappers.get(0).getTasks().get(0).getStageWrapperType();
+      // !!! upgrades for the first host
+      starterUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+          holder.m_tasks, true, "Upgrading"));
 
-      StageWrapper stage = new StageWrapper(type, text, new HashMap<>(), wrappers);
+      // !!! post tasks for the first host
+      starterUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+          holder.m_postTasks, true, "Completing"));
 
-      // !!! force the service check on the first host
+      // !!! service check for the first host
       StageWrapper serviceCheck = new ServiceCheckStageWrapper(service,
           upgradeContext.getServiceDisplay(service), false, holder.m_firstHost);
-
-      starterUpgrades.add(stage);
       starterUpgrades.add(serviceCheck);
 
-      wrappers = buildWrappers(service, component, holder.m_tasks, holder.m_remainingHosts, false);
+      // !!! pre-tasks for remaining hosts
+      finisherUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+          holder.m_preTasks, false, "Prepare Remaining"));
 
-      text = getStageText("Upgrade Remaining",
-          upgradeContext.getComponentDisplay(service, component),
-          holder.m_remainingHosts);
-      stage = new StageWrapper(type, text, new HashMap<>(), wrappers);
+      // !!! upgrades for the remaining hosts
+      finisherUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+          holder.m_tasks, false, "Upgrade Remaining"));
 
-      finisherUpgrades.add(stage);
+      // !!! post tasks for the remaining hosts
+      finisherUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+          holder.m_postTasks, false, "Complete Remaining"));
     });
 
     List<StageWrapper> results = new ArrayList<>(stageWrappers);
@@ -132,27 +141,81 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
   }
 
   /**
+   * Builds the stages for the components
+   * @param upgradeContext
+   *          the upgrade context
+   * @param service
+   *          the service name
+   * @param holder
+   *          the holder of component and hosts
+   * @param tasks
+   *          the tasks to wrap
+   * @param firstHost
+   *          {@code true} if wrapping for the first host
+   * @param prefix
+   *          the text prefix for the stage
+   * @return
+   *          the list of stage wrappers
+   */
+  private List<StageWrapper> buildStageWrappers(UpgradeContext upgradeContext, String service,
+      HostHolder holder, List<Task> tasks, boolean firstHost, String prefix) {
+
+    if (CollectionUtils.isEmpty(tasks)) {
+      return Collections.emptyList();
+    }
+
+    Set<String> hosts = firstHost ? Collections.singleton(holder.m_firstHost) :
+      holder.m_remainingHosts;
+
+    String component = holder.m_component;
+    String componentDisplay = upgradeContext.getComponentDisplay(service, component);
+    String text = getStageText(prefix, componentDisplay, hosts);
+
+    List<TaskWrapper> wrappers = buildTaskWrappers(service, component, tasks, hosts, firstHost);
+
+    List<StageWrapper> results = new ArrayList<>();
+
+    wrappers.forEach(task -> {
+      // !!! there should only be one task per wrapper, so this assumption is ok for now
+      StageWrapper.Type type = task.getTasks().get(0).getStageWrapperType();
+
+      StageWrapper stage = new StageWrapper(type, text, new HashMap<>(),
+          Collections.singletonList(task));
+      results.add(stage);
+    });
+
+    return results;
+  }
+
+  /**
    * Build the wrappers for the tasks.
    *
+   * @param service
+   *          the service name
+   * @param component
+   *          the component name
    * @param tasks
    *          the tasks to wrap
    * @param hosts
    *          the hosts where the tasks should run
+   * @param firstHost
+   *          {@code true} if these wrappers are for the first host
    * @return
+   *        the list if task wrappers
    */
-  private List<TaskWrapper> buildWrappers(String service, String component,
+  private List<TaskWrapper> buildTaskWrappers(String service, String component,
       List<Task> tasks, Set<String> hosts, boolean firstHost) {
 
     List<TaskWrapper> results = new ArrayList<>();
 
-    String ambariServerHostname = StageUtils.getHostName();
-
-    for (Task task : tasks) {
-
-      // only add the server-side task if there are actual hosts for the service/component
+    tasks.forEach(task -> {
+      // server side actions should run only run once (the first host)
       if (task.getType().isServerAction()) {
-        results.add(new TaskWrapper(service, component, Collections.singleton(ambariServerHostname), task));
-        continue;
+        if (firstHost) {
+          String ambariServerHostname = StageUtils.getHostName();
+          results.add(new TaskWrapper(service, component, Collections.singleton(ambariServerHostname), task));
+        }
+        return;
       }
 
       // FIXME how to handle master-only types
@@ -163,12 +226,12 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
 
         // !!! singular types have already run when firstHost is true
         if (et.hosts != ExecuteHostType.ALL) {
-          continue;
+          return;
         }
       }
 
       results.add(new TaskWrapper(service, component, hosts, task));
-    }
+    });
 
     return results;
   }
@@ -177,10 +240,13 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
    * Temporary holder for building stage wrappers
    */
   private static class HostHolder {
-    private ProcessingComponent m_component;
+    private String m_component;
     private String m_firstHost;
     private Set<String> m_remainingHosts = new HashSet<>();
+    // !!! there will only ever be one, but code is cleaner this way
     private List<Task> m_tasks;
+    private List<Task> m_preTasks;
+    private List<Task> m_postTasks;
   }
 
 }