You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2018/11/15 20:47:33 UTC
[ambari] branch trunk updated: [AMBARI-24909] Parallel Client Fixes
for Server Side Tasks (#2618)
This is an automated email from the ASF dual-hosted git repository.
ncole pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 60d3134 [AMBARI-24909] Parallel Client Fixes for Server Side Tasks (#2618)
60d3134 is described below
commit 60d313463ce52c8df5c2268491a70c8cd38eb6a8
Author: ncole <nc...@hortonworks.com>
AuthorDate: Thu Nov 15 15:47:28 2018 -0500
[AMBARI-24909] Parallel Client Fixes for Server Side Tasks (#2618)
---
.../resource_management/libraries/script/script.py | 2 +-
.../orchestrate/ParallelClientGroupingBuilder.java | 136 +++++++++++++++------
2 files changed, 102 insertions(+), 36 deletions(-)
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py
index 67bfca1..76893fd 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py
@@ -1056,7 +1056,7 @@ class Script(object):
if self.should_expose_component_version("restart"):
self.save_component_version_to_structured_out("restart")
- def post_upgrade_restart(env, upgrade_type=None):
+ def post_upgrade_restart(self, env, upgrade_type=None):
"""
To be overridden by subclasses
"""
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java
index 9411995..0ed9931 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java
@@ -60,6 +60,12 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
return;
}
+ Task task = resolveTask(upgradeContext, pc);
+ // !!! better not
+ if (null == task) {
+ return;
+ }
+
Iterator<String> hostIterator = hostsType.getHosts().iterator();
HostHolder holder = new HostHolder();
holder.m_firstHost = hostIterator.next();
@@ -68,11 +74,10 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
holder.m_remainingHosts.add(hostIterator.next());
}
- holder.m_component = pc;
- holder.m_tasks = new ArrayList<>();
- holder.m_tasks.addAll(resolveTasks(upgradeContext, true, pc));
- holder.m_tasks.add(resolveTask(upgradeContext, pc));
- holder.m_tasks.addAll(resolveTasks(upgradeContext, false, pc));
+ holder.m_component = pc.name;
+ holder.m_tasks = Collections.singletonList(task);
+ holder.m_preTasks = resolveTasks(upgradeContext, true, pc);
+ holder.m_postTasks = resolveTasks(upgradeContext, false, pc);
serviceToHostMap.put(service, holder);
}
@@ -91,36 +96,40 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
// !!! create a stage wrapper for the service check on the first host
// !!! create a stage wrapper for the remaining hosts
- serviceToHostMap.forEach((service, holder) -> {
- String component = holder.m_component.name;
+ // !!! when building stages, the pre- and post- tasks should be run sequentially,
+ // which means they should be in their own stages. that may seem counter-intuitive,
+ // but the parallelism is across hosts, not stages.
- List<TaskWrapper> wrappers = buildWrappers(service, component, holder.m_tasks,
- Collections.singleton(holder.m_firstHost), true);
+ serviceToHostMap.forEach((service, holder) -> {
- String text = getStageText("Upgrading",
- upgradeContext.getComponentDisplay(service, component),
- Collections.singleton(holder.m_firstHost));
+ // !!! pre-tasks for the first host
+ starterUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+ holder.m_preTasks, true, "Preparing"));
- // !!! this is a poor assumption
- StageWrapper.Type type = wrappers.get(0).getTasks().get(0).getStageWrapperType();
+ // !!! upgrades for the first host
+ starterUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+ holder.m_tasks, true, "Upgrading"));
- StageWrapper stage = new StageWrapper(type, text, new HashMap<>(), wrappers);
+ // !!! post tasks for the first host
+ starterUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+ holder.m_postTasks, true, "Completing"));
- // !!! force the service check on the first host
+ // !!! service check for the first host
StageWrapper serviceCheck = new ServiceCheckStageWrapper(service,
upgradeContext.getServiceDisplay(service), false, holder.m_firstHost);
-
- starterUpgrades.add(stage);
starterUpgrades.add(serviceCheck);
- wrappers = buildWrappers(service, component, holder.m_tasks, holder.m_remainingHosts, false);
+ // !!! pre-tasks for remaining hosts
+ finisherUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+ holder.m_preTasks, false, "Prepare Remaining"));
- text = getStageText("Upgrade Remaining",
- upgradeContext.getComponentDisplay(service, component),
- holder.m_remainingHosts);
- stage = new StageWrapper(type, text, new HashMap<>(), wrappers);
+ // !!! upgrades for the remaining hosts
+ finisherUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+ holder.m_tasks, false, "Upgrade Remaining"));
- finisherUpgrades.add(stage);
+ // !!! post tasks for the remaining hosts
+ finisherUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder,
+ holder.m_postTasks, false, "Complete Remaining"));
});
List<StageWrapper> results = new ArrayList<>(stageWrappers);
@@ -132,27 +141,81 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
}
/**
+ * Builds the stages for the components
+ * @param upgradeContext
+ * the upgrade context
+ * @param service
+ * the service name
+ * @param holder
+ * the holder of component and hosts
+ * @param tasks
+ * the tasks to wrap
+ * @param firstHost
+ * {@code true} if wrapping for the first host
+ * @param prefix
+ * the text prefix for the stage
+ * @return
+ * the list of stage wrappers
+ */
+ private List<StageWrapper> buildStageWrappers(UpgradeContext upgradeContext, String service,
+ HostHolder holder, List<Task> tasks, boolean firstHost, String prefix) {
+
+ if (CollectionUtils.isEmpty(tasks)) {
+ return Collections.emptyList();
+ }
+
+ Set<String> hosts = firstHost ? Collections.singleton(holder.m_firstHost) :
+ holder.m_remainingHosts;
+
+ String component = holder.m_component;
+ String componentDisplay = upgradeContext.getComponentDisplay(service, component);
+ String text = getStageText(prefix, componentDisplay, hosts);
+
+ List<TaskWrapper> wrappers = buildTaskWrappers(service, component, tasks, hosts, firstHost);
+
+ List<StageWrapper> results = new ArrayList<>();
+
+ wrappers.forEach(task -> {
+ // !!! there should only be one task per wrapper, so this assumption is ok for now
+ StageWrapper.Type type = task.getTasks().get(0).getStageWrapperType();
+
+ StageWrapper stage = new StageWrapper(type, text, new HashMap<>(),
+ Collections.singletonList(task));
+ results.add(stage);
+ });
+
+ return results;
+ }
+
+ /**
* Build the wrappers for the tasks.
*
+ * @param service
+ * the service name
+ * @param component
+ * the component name
* @param tasks
* the tasks to wrap
* @param hosts
* the hosts where the tasks should run
+ * @param firstHost
+ * {@code true} if these wrappers are for the first host
* @return
+ * the list if task wrappers
*/
- private List<TaskWrapper> buildWrappers(String service, String component,
+ private List<TaskWrapper> buildTaskWrappers(String service, String component,
List<Task> tasks, Set<String> hosts, boolean firstHost) {
List<TaskWrapper> results = new ArrayList<>();
- String ambariServerHostname = StageUtils.getHostName();
-
- for (Task task : tasks) {
-
- // only add the server-side task if there are actual hosts for the service/component
+ tasks.forEach(task -> {
+ // server side actions should run only run once (the first host)
if (task.getType().isServerAction()) {
- results.add(new TaskWrapper(service, component, Collections.singleton(ambariServerHostname), task));
- continue;
+ if (firstHost) {
+ String ambariServerHostname = StageUtils.getHostName();
+ results.add(new TaskWrapper(service, component, Collections.singleton(ambariServerHostname), task));
+ }
+ return;
}
// FIXME how to handle master-only types
@@ -163,12 +226,12 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
// !!! singular types have already run when firstHost is true
if (et.hosts != ExecuteHostType.ALL) {
- continue;
+ return;
}
}
results.add(new TaskWrapper(service, component, hosts, task));
- }
+ });
return results;
}
@@ -177,10 +240,13 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder {
* Temporary holder for building stage wrappers
*/
private static class HostHolder {
- private ProcessingComponent m_component;
+ private String m_component;
private String m_firstHost;
private Set<String> m_remainingHosts = new HashSet<>();
+ // !!! there will only ever be one, but code is cleaner this way
private List<Task> m_tasks;
+ private List<Task> m_preTasks;
+ private List<Task> m_postTasks;
}
}