You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/04/17 20:18:06 UTC

[12/34] ambari git commit: AMBARI-20736. Allow Potentially Long Running Restart Commands To Have Their Own Timeout (ncole)

AMBARI-20736. Allow Potentially Long Running Restart Commands To Have Their Own Timeout (ncole)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ac75f1da
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ac75f1da
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ac75f1da

Branch: refs/heads/branch-feature-AMBARI-12556
Commit: ac75f1daccc2c1117e175f95cd9642e85b4fd366
Parents: 4f41968
Author: Nate Cole <nc...@hortonworks.com>
Authored: Tue Apr 11 14:36:43 2017 -0400
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Thu Apr 13 08:53:50 2017 -0400

----------------------------------------------------------------------
 .../libraries/functions/decorator.py            | 23 +++++--
 .../AmbariCustomCommandExecutionHelper.java     | 12 +++-
 .../internal/UpgradeResourceProvider.java       |  8 ++-
 .../server/state/stack/upgrade/Grouping.java    |  2 +-
 .../state/stack/upgrade/StageWrapper.java       | 65 +++++++++++++++++++
 .../ambari/server/state/stack/upgrade/Task.java |  6 ++
 .../server/state/stack/upgrade/TaskWrapper.java | 25 +++++++-
 .../state/stack/upgrade/TaskWrapperBuilder.java |  5 +-
 .../2.1.0.2.0/package/scripts/hdfs_namenode.py  | 11 +++-
 .../2.1.0.2.0/package/scripts/params_linux.py   |  2 +
 .../stacks/HDP/2.3/upgrades/upgrade-2.3.xml     |  2 +-
 .../stacks/HDP/2.3/upgrades/upgrade-2.4.xml     |  2 +-
 .../stacks/HDP/2.3/upgrades/upgrade-2.5.xml     |  2 +-
 .../stacks/HDP/2.3/upgrades/upgrade-2.6.xml     |  2 +-
 .../stacks/HDP/2.4/upgrades/upgrade-2.4.xml     |  2 +-
 .../stacks/HDP/2.4/upgrades/upgrade-2.5.xml     |  2 +-
 .../stacks/HDP/2.4/upgrades/upgrade-2.6.xml     |  2 +-
 .../stacks/HDP/2.5/upgrades/upgrade-2.5.xml     |  2 +-
 .../stacks/HDP/2.5/upgrades/upgrade-2.6.xml     |  2 +-
 .../stacks/HDP/2.6/upgrades/upgrade-2.6.xml     |  2 +-
 .../src/main/resources/upgrade-pack.xsd         |  1 +
 .../internal/UpgradeResourceProviderTest.java   | 66 +++++++++++++++++++-
 .../stacks/HDP/2.1.1/upgrades/upgrade_test.xml  |  2 +-
 23 files changed, 218 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-common/src/main/python/resource_management/libraries/functions/decorator.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/decorator.py b/ambari-common/src/main/python/resource_management/libraries/functions/decorator.py
index 55cf335..b5b804d 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/decorator.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/decorator.py
@@ -26,13 +26,15 @@ __all__ = ['retry', 'safe_retry', ]
 from resource_management.core.logger import Logger
 
 
-def retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=Exception):
+def retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=Exception, timeout_func=None):
   """
   Retry decorator for improved robustness of functions.
-  :param times: Number of times to attempt to call the function.
+  :param times: Number of times to attempt to call the function.  Optionally specify the timeout_func.
   :param sleep_time: Initial sleep time between attempts
   :param backoff_factor: After every failed attempt, multiple the previous sleep time by this factor.
   :param err_class: Exception class to handle
+  :param timeout_func: used when the 'times' argument should be computed.  this function should
+         return an integer value that indicates the number of seconds to wait
   :return: Returns the output of the wrapped function.
   """
   def decorator(function):
@@ -42,6 +44,10 @@ def retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=E
       _backoff_factor = backoff_factor
       _err_class = err_class
 
+      if timeout_func is not None:
+        timeout = timeout_func()
+        _times = timeout // sleep_time  # ensure we end up with an integer
+
       while _times > 1:
         _times -= 1
         try:
@@ -49,7 +55,8 @@ def retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=E
         except _err_class, err:
           Logger.info("Will retry %d time(s), caught exception: %s. Sleeping for %d sec(s)" % (_times, str(err), _sleep_time))
           time.sleep(_sleep_time)
-        if(_sleep_time * _backoff_factor <= max_sleep_time):
+
+        if _sleep_time * _backoff_factor <= max_sleep_time:
           _sleep_time *= _backoff_factor
 
       return function(*args, **kwargs)
@@ -57,15 +64,17 @@ def retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=E
   return decorator
 
 
-def safe_retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=Exception, return_on_fail=None):
+def safe_retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=Exception, return_on_fail=None, timeout_func=None):
   """
   Retry decorator for improved robustness of functions. Instead of error generation on the last try, will return
   return_on_fail value.
-  :param times: Number of times to attempt to call the function.
+  :param times: Number of times to attempt to call the function.  Optionally specify the timeout_func.
   :param sleep_time: Initial sleep time between attempts
   :param backoff_factor: After every failed attempt, multiple the previous sleep time by this factor.
   :param err_class: Exception class to handle
   :param return_on_fail value to return on the last try
+  :param timeout_func: used when the 'times' argument should be computed.  this function should
+         return an integer value that indicates the number of seconds to wait
   :return: Returns the output of the wrapped function.
   """
   def decorator(function):
@@ -76,6 +85,10 @@ def safe_retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_cl
       _err_class = err_class
       _return_on_fail = return_on_fail
 
+      if timeout_func is not None:
+        timeout = timeout_func()
+        _times = timeout // sleep_time  # ensure we end up with an integer
+
       while _times > 1:
         _times -= 1
         try:

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
index d5d7cf4..a493b94 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -445,7 +445,7 @@ public class AmbariCustomCommandExecutionHelper {
       }
 
       boolean isInstallCommand = commandName.equals(RoleCommand.INSTALL.toString());
-      String commandTimeout = configs.getDefaultAgentTaskTimeout(isInstallCommand);
+      int commandTimeout = Short.valueOf(configs.getDefaultAgentTaskTimeout(isInstallCommand)).intValue();
 
       if (serviceInfo.getSchemaVersion().equals(AmbariMetaInfo.SCHEMA_VERSION_2)) {
         // Service check command is not custom command
@@ -455,7 +455,7 @@ public class AmbariCustomCommandExecutionHelper {
           commandParams.put(SCRIPT, script.getScript());
           commandParams.put(SCRIPT_TYPE, script.getScriptType().toString());
           if (script.getTimeout() > 0) {
-            commandTimeout = String.valueOf(script.getTimeout());
+            commandTimeout = script.getTimeout();
           }
         } else {
           String message = String.format("Component %s has not command script " +
@@ -466,7 +466,13 @@ public class AmbariCustomCommandExecutionHelper {
         // We don't need package/repo information to perform service check
       }
 
-      commandParams.put(COMMAND_TIMEOUT, commandTimeout);
+      // !!! the action execution context timeout is the final say, but make sure it's at least 60 seconds
+      if (null != actionExecutionContext.getTimeout()) {
+        commandTimeout = actionExecutionContext.getTimeout().intValue();
+        commandTimeout = Math.max(60, commandTimeout);
+      }
+
+      commandParams.put(COMMAND_TIMEOUT, "" + commandTimeout);
       commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder());
       commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
index 709ca93..511c8fb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
@@ -1314,6 +1314,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
       String serviceName = wrapper.getTasks().get(0).getService();
       ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(),
           stackId.getStackVersion(), serviceName);
+
       params.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder());
       params.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
     }
@@ -1324,7 +1325,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     // hosts in maintenance mode are excluded from the upgrade
     actionContext.setMaintenanceModeHostExcluded(true);
 
-    actionContext.setTimeout(Short.valueOf(s_configuration.getDefaultAgentTaskTimeout(false)));
+    actionContext.setTimeout(wrapper.getMaxTimeout(s_configuration));
     actionContext.setRetryAllowed(allowRetry);
     actionContext.setAutoSkipFailures(context.isComponentFailureAutoSkipped());
 
@@ -1404,7 +1405,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
 
     ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(),
         function, filters, commandParams);
-    actionContext.setTimeout(Short.valueOf(s_configuration.getDefaultAgentTaskTimeout(false)));
+    actionContext.setTimeout(wrapper.getMaxTimeout(s_configuration));
     actionContext.setRetryAllowed(allowRetry);
     actionContext.setAutoSkipFailures(context.isComponentFailureAutoSkipped());
 
@@ -1440,6 +1441,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     }
 
     s_commandExecutionHelper.get().addExecutionCommandsToStage(actionContext, stage, requestParams);
+
     request.addStages(Collections.singletonList(stage));
   }
 
@@ -1464,7 +1466,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(),
         "SERVICE_CHECK", filters, commandParams);
 
-    actionContext.setTimeout(Short.valueOf(s_configuration.getDefaultAgentTaskTimeout(false)));
+    actionContext.setTimeout(wrapper.getMaxTimeout(s_configuration));
     actionContext.setRetryAllowed(allowRetry);
     actionContext.setAutoSkipFailures(context.isServiceCheckFailureAutoSkipped());
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
index cd17a70..99ed0aa 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
@@ -129,6 +129,7 @@ public class Grouping {
       for (TaskBucket bucket : buckets) {
         // The TaskWrappers take into account if a task is meant to run on all, any, or master.
         // A TaskWrapper may contain multiple tasks, but typically only one, and they all run on the same set of hosts.
+        // Generate a task wrapper for every task in the bucket
         List<TaskWrapper> preTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, bucket.tasks, params);
         List<List<TaskWrapper>> organizedTasks = organizeTaskWrappersBySyncRules(preTasks);
         for (List<TaskWrapper> tasks : organizedTasks) {
@@ -219,7 +220,6 @@ public class Grouping {
         int batchNum = 0;
         for (Set<String> hostSubset : hostSets) {
           batchNum++;
-          TaskWrapper expandedTW = new TaskWrapper(tw.getService(), tw.getComponent(), hostSubset, tw.getParams(), tw.getTasks());
 
           String stageText = getStageText(verb, ctx.getComponentDisplay(service, pc.name), hostSubset, batchNum, numBatchesNeeded);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
index aac8935..81f4e0b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
@@ -25,6 +25,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Objects;
 import com.google.gson.Gson;
 
@@ -33,6 +39,8 @@ import com.google.gson.Gson;
  */
 public class StageWrapper {
 
+  private static final Logger LOG = LoggerFactory.getLogger(StageWrapper.class);
+
   private static Gson gson = new Gson();
   private String text;
   private Type type;
@@ -163,4 +171,61 @@ public class StageWrapper {
         .add("text",text)
         .omitNullValues().toString();
   }
+
+  /**
+   * Gets the maximum timeout for any task that this {@code StageWrapper} encapsulates.  TaskWrappers
+   * are homogeneous across the stage, but timeouts are defined in Upgrade Packs
+   * at the task, so each one should be checked individually.
+   *
+   * <p>
+   * WARNING:  This method relies on incorrect assumptions about {@link StageWrapper}s and the {@link TaskWrapper}s
+   * that are contained in them.  Orchestration is currently forcing a StageWrapper to have only one TaskWrapper,
+   * even though they could have many per the code.
+   *
+   * In addition, a TaskWrapper should have a one-to-one reference with the Task it contains.  That will be
+   * fixed in a future release.
+   * </p>
+   *
+   * @param configuration the configuration instance.  StageWrappers are not injectable, so pass
+   *                      this in.
+   * @return the maximum timeout, or the default agent execution timeout if none are found.  Never {@code null}.
+   */
+  public Short getMaxTimeout(Configuration configuration) {
+
+    Set<String> timeoutKeys = new HashSet<>();
+
+    // !!! FIXME a TaskWrapper should have only one task.
+    for (TaskWrapper wrapper : tasks) {
+      timeoutKeys.addAll(wrapper.getTimeoutKeys());
+    }
+
+    Short defaultTimeout = Short.valueOf(configuration.getDefaultAgentTaskTimeout(false));
+
+    if (CollectionUtils.isEmpty(timeoutKeys)) {
+      return defaultTimeout;
+    }
+
+    Short timeout = null;
+
+    for (String key : timeoutKeys) {
+      String configValue = configuration.getProperty(key);
+
+      if (StringUtils.isNotBlank(configValue)) {
+        try {
+          Short configTimeout = Short.valueOf(configValue);
+
+          if (null == timeout || configTimeout > timeout) {
+            timeout = configTimeout;
+          }
+
+        } catch (Exception e) {
+          LOG.warn("Could not parse {}/{} to a timeout value", key, configValue);
+        }
+      } else {
+        LOG.warn("Configuration {} not found to compute timeout", key);
+      }
+    }
+
+    return null == timeout ? defaultTimeout : timeout;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
index 5c43c2b..5c7cb6c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
@@ -42,6 +42,12 @@ public abstract class Task {
   public boolean isSequential = false;
 
   /**
+   * The config property to check for timeout.
+   */
+  @XmlAttribute(name="timeout-config")
+  public String timeoutConfig = null;
+
+  /**
    * @return the type of the task
    */
   public abstract Type getType();

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
index 11e27cf..dfa6159 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
@@ -19,10 +19,13 @@ package org.apache.ambari.server.state.stack.upgrade;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.base.Objects;
 
 /**
@@ -34,7 +37,9 @@ public class TaskWrapper {
   private String component;
   private Set<String> hosts; // all the hosts that all the tasks must run
   private Map<String, String> params;
+  /* FIXME a TaskWrapper really should be wrapping ONLY ONE task */
   private List<Task> tasks; // all the tasks defined for the hostcomponent
+  private Set<String> timeoutKeys = new HashSet<>();
 
   /**
    * @param s the service name for the tasks
@@ -42,10 +47,11 @@ public class TaskWrapper {
    * @param hosts the set of hosts that the tasks are for
    * @param tasks an array of tasks as a convenience
    */
-  public TaskWrapper(String s, String c, Set<String> hosts, Task... tasks) {
-    this(s, c, hosts, null, Arrays.asList(tasks));
+  public TaskWrapper(String s, String c, Set<String> hosts, Task task) {
+    this(s, c, hosts, null, task);
   }
 
+
   /**
    * @param s the service name for the tasks
    * @param c the component name for the tasks
@@ -71,6 +77,13 @@ public class TaskWrapper {
     this.hosts = hosts;
     this.params = (params == null) ? new HashMap<String, String>() : params;
     this.tasks = tasks;
+
+    // !!! FIXME there should only be one task
+    for (Task task : tasks) {
+      if (StringUtils.isNotBlank(task.timeoutConfig)) {
+        timeoutKeys.add(task.timeoutConfig);
+      }
+    }
   }
 
   /**
@@ -133,4 +146,12 @@ public class TaskWrapper {
     return false;
   }
 
+
+  /**
+   * @return the timeout keys for all the tasks in this wrapper.
+   */
+  public Set<String> getTimeoutKeys() {
+    return timeoutKeys;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
index a75fe00..2212b5a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
@@ -40,13 +40,16 @@ public class TaskWrapperBuilder {
   private static Logger LOG = LoggerFactory.getLogger(TaskWrapperBuilder.class);
 
   /**
-   * Creates a collection of tasks based on the set of hosts they are allowed to run on
+   * Creates a collection of task wrappers based on the set of hosts they are allowed to run on
    * by analyzing the "hosts" attribute of any ExecuteTask objects.
+   *
    * @param service the service name for the tasks
    * @param component the component name for the tasks
    * @param hostsType the collection of sets along with their status
    * @param tasks collection of tasks
    * @param params additional parameters
+   *
+   * @return the task wrappers, one for each task that is passed with {@code tasks}
    */
   public static List<TaskWrapper> getTaskList(String service, String component, HostsType hostsType, List<Task> tasks, Map<String, String> params) {
     // Ok if Ambari Server is not part of the cluster hosts since this is only used in the calculation of how many batches

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
index 0489792..aa34dc0 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
@@ -533,7 +533,16 @@ def is_namenode_bootstrapped(params):
   return marked
 
 
-@retry(times=125, sleep_time=5, backoff_factor=2, err_class=Fail)
+def find_timeout():
+  import params
+
+  if isinstance(params.command_timeout, (int, long)):
+    return params.command_timeout
+
+  return int(params.command_timeout)
+
+
+@retry(sleep_time=5, backoff_factor=2, err_class=Fail, timeout_func=find_timeout)
 def is_this_namenode_active():
   """
   Gets whether the current NameNode is Active. This function will wait until the NameNode is

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
index f0566d7..e88dbdd 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
@@ -70,6 +70,8 @@ version = default("/commandParams/version", None)
 # are started using different commands.
 desired_namenode_role = default("/commandParams/desired_namenode_role", None)
 
+command_timeout = default("/commandParams/command_timeout", 900)
+
 # get the correct version to use for checking stack features
 version_for_stack_feature_checks = get_stack_feature_version(config)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.3.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.3.xml
index 1340b22..97904bf 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.3.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.3.xml
@@ -548,7 +548,7 @@
     <service name="HDFS">
       <component name="NAMENODE">
         <upgrade>
-          <task xsi:type="restart-task"/>
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
       </component>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml
index 40afc4f..3757121 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml
@@ -579,7 +579,7 @@
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
 
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
       </component>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.5.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.5.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.5.xml
index e0882d8..f7fd175 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.5.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.5.xml
@@ -678,7 +678,7 @@
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
 
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
       </component>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml
index 0f4efdc..78fe831 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml
@@ -687,7 +687,7 @@
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
 
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
       </component>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.4.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.4.xml b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.4.xml
index d5e9a5b..fba7093 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.4.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.4.xml
@@ -524,7 +524,7 @@
     <service name="HDFS">
       <component name="NAMENODE">
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
       </component>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.5.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.5.xml b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.5.xml
index 350395c..68efed2 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.5.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.5.xml
@@ -678,7 +678,7 @@
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
 
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
       </component>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml
index 9ac3d52..2ed7962 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml
@@ -692,7 +692,7 @@
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
 
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
       </component>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.5.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.5.xml b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.5.xml
index 04a06e8..1af96dd 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.5.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.5.xml
@@ -574,7 +574,7 @@
     <service name="HDFS">
       <component name="NAMENODE">
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
       </component>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml
index 879fe0f..53d4579 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml
@@ -619,7 +619,7 @@
         </pre-upgrade>
         <pre-downgrade />
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
       </component>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml
index fd72e4d..5b8f53b 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml
@@ -610,7 +610,7 @@
         </pre-upgrade>
         <pre-downgrade/> <!--  no-op to prevent config changes on downgrade -->
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
       </component>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/main/resources/upgrade-pack.xsd
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/upgrade-pack.xsd b/ambari-server/src/main/resources/upgrade-pack.xsd
index 1f11aa1..aa7ddd8 100644
--- a/ambari-server/src/main/resources/upgrade-pack.xsd
+++ b/ambari-server/src/main/resources/upgrade-pack.xsd
@@ -276,6 +276,7 @@
       <xs:element name="summary" minOccurs="0" />
     </xs:sequence>
     <xs:attribute name="sequential" use="optional" type="xs:boolean" />
+    <xs:attribute name="timeout-config" use="optional" type="xs:string" />
   </xs:complexType>
   
   <xs:complexType name="restart-task">

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
index 999b7a7..e587f28 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
@@ -42,9 +42,11 @@ import org.apache.ambari.server.H2DatabaseCleaner;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.agent.ExecutionCommand.KeyNames;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.audit.AuditLogger;
 import org.apache.ambari.server.configuration.Configuration;
@@ -160,9 +162,10 @@ public class UpgradeResourceProviderTest {
 
     EasyMock.replay(configHelper);
 
+    InMemoryDefaultTestModule module = new InMemoryDefaultTestModule();
+
     // create an injector which will inject the mocks
-    injector = Guice.createInjector(Modules.override(
-        new InMemoryDefaultTestModule()).with(new MockModule()));
+    injector = Guice.createInjector(Modules.override(module).with(new MockModule()));
 
     H2DatabaseCleaner.resetSequences(injector);
     injector.getInstance(GuiceJpaInitializer.class);
@@ -250,9 +253,12 @@ public class UpgradeResourceProviderTest {
     sch = component.addServiceComponentHost("h1");
     sch.setVersion("2.1.1.0");
 
+    Configuration configuration = injector.getInstance(Configuration.class);
+    configuration.setProperty("upgrade.parameter.zk-server.timeout", "824");
+
     topologyManager = injector.getInstance(TopologyManager.class);
     StageUtils.setTopologyManager(topologyManager);
-    StageUtils.setConfiguration(injector.getInstance(Configuration.class));
+    StageUtils.setConfiguration(configuration);
     ActionManager.setTopologyManager(topologyManager);
     EasyMock.replay(injector.getInstance(AuditLogger.class));
   }
@@ -1650,6 +1656,60 @@ public class UpgradeResourceProviderTest {
         HostRoleStatus.IN_PROGRESS_STATUSES);
   }
 
+  @Test
+  public void testTimeouts() throws Exception {
+    Cluster cluster = clusters.getCluster("c1");
+
+    StackEntity stackEntity = stackDAO.find("HDP", "2.1.1");
+    RepositoryVersionEntity repoVersionEntity = new RepositoryVersionEntity();
+    repoVersionEntity.setDisplayName("My New Version 3");
+    repoVersionEntity.setOperatingSystems("");
+    repoVersionEntity.setStack(stackEntity);
+    repoVersionEntity.setVersion("2.2.2.3");
+    repoVersionDao.create(repoVersionEntity);
+
+    Map<String, Object> requestProps = new HashMap<>();
+    requestProps.put(UpgradeResourceProvider.UPGRADE_CLUSTER_NAME, "c1");
+    requestProps.put(UpgradeResourceProvider.UPGRADE_VERSION, "2.2.2.3");
+    requestProps.put(UpgradeResourceProvider.UPGRADE_PACK, "upgrade_test");
+    requestProps.put(UpgradeResourceProvider.UPGRADE_SKIP_PREREQUISITE_CHECKS, "true");
+    requestProps.put(UpgradeResourceProvider.UPGRADE_DIRECTION, Direction.UPGRADE.name());
+
+    ResourceProvider upgradeResourceProvider = createProvider(amc);
+
+    Request request = PropertyHelper.getCreateRequest(Collections.singleton(requestProps), null);
+    RequestStatus status = upgradeResourceProvider.createResources(request);
+
+
+    Set<Resource> createdResources = status.getAssociatedResources();
+    assertEquals(1, createdResources.size());
+    Resource res = createdResources.iterator().next();
+    Long id = (Long) res.getPropertyValue("Upgrade/request_id");
+    assertNotNull(id);
+    assertEquals(Long.valueOf(1), id);
+
+
+    ActionManager am = injector.getInstance(ActionManager.class);
+
+    List<HostRoleCommand> commands = am.getRequestTasks(id);
+
+    boolean found = false;
+
+    for (HostRoleCommand command : commands) {
+      ExecutionCommandWrapper wrapper = command.getExecutionCommandWrapper();
+
+      if (command.getRole().equals(Role.ZOOKEEPER_SERVER) && command.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND)) {
+        Map<String, String> commandParams = wrapper.getExecutionCommand().getCommandParams();
+        assertTrue(commandParams.containsKey(KeyNames.COMMAND_TIMEOUT));
+        assertEquals("824",commandParams.get(KeyNames.COMMAND_TIMEOUT));
+        found = true;
+      }
+    }
+
+    assertTrue("ZooKeeper timeout override was found", found);
+
+  }
+
   /**
    *
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac75f1da/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml b/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml
index 8d506bf..037e39a 100644
--- a/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml
+++ b/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml
@@ -146,7 +146,7 @@
         </pre-upgrade>
         <pre-downgrade copy-upgrade="true" />
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.zk-server.timeout"/>
         </upgrade>
         <post-upgrade>
           <task xsi:type="configure" id="hdp_2_1_1_zookeeper_new_config_type" />