You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2015/05/25 13:23:08 UTC

ambari git commit: AMBARI-11154 - Storm Upgrade Pack For HDP-2.2 to HDP-2.3 (jonathanhurley)

Repository: ambari
Updated Branches:
  refs/heads/trunk 8690cfdb4 -> ad2f5442b


AMBARI-11154 - Storm Upgrade Pack For HDP-2.2 to HDP-2.3 (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ad2f5442
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ad2f5442
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ad2f5442

Branch: refs/heads/trunk
Commit: ad2f5442b3706235969c8982d3fa92fea808dc29
Parents: 8690cfd
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Wed May 20 17:00:31 2015 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Mon May 25 07:23:01 2015 -0400

----------------------------------------------------------------------
 .../ambari/server/actionmanager/Stage.java      |  65 ++++-----
 .../ambari/server/agent/ExecutionCommand.java   |  31 ++++-
 .../serveraction/upgrades/ConfigureAction.java  |  38 +++++-
 .../state/stack/upgrade/ConfigureTask.java      |   3 -
 .../server/state/stack/upgrade/ManualTask.java  |   4 -
 .../ambari/server/state/stack/upgrade/Task.java |   7 +
 .../apache/ambari/server/utils/StageUtils.java  |  53 ++++----
 .../STORM/0.9.1.2.1/configuration/storm-env.xml |   3 +-
 .../STORM/0.9.1.2.1/package/scripts/nimbus.py   |   5 +
 .../0.9.1.2.1/package/scripts/nimbus_prod.py    |   1 +
 .../0.9.1.2.1/package/scripts/params_linux.py   |   7 +-
 .../STORM/0.9.1.2.1/package/scripts/service.py  |  56 ++++----
 .../0.9.1.2.1/package/scripts/status_params.py  |   2 +
 .../0.9.1.2.1/package/scripts/storm_upgrade.py  | 133 +++++++++++++++++++
 .../0.9.1.2.1/package/scripts/supervisor.py     |   1 +
 .../package/scripts/supervisor_prod.py          |   1 +
 .../services/STORM/configuration/storm-env.xml  |   4 +-
 .../stacks/HDP/2.2/upgrades/upgrade-2.3.xml     |  64 ++++++++-
 .../ranger-storm-policymgr-ssl.xml              |   4 +-
 .../configuration/ranger-storm-security.xml     |   2 +-
 .../ExecutionCommandWrapperTest.java            |  96 ++++++-------
 .../server/agent/TestHeartbeatHandler.java      |  23 +++-
 .../stacks/2.1/STORM/test_storm_nimbus.py       |   2 +
 .../stacks/2.1/STORM/test_storm_nimbus_prod.py  |   2 +
 .../stacks/2.1/STORM/test_storm_supervisor.py   |   2 +
 .../2.1/STORM/test_storm_supervisor_prod.py     |   2 +
 26 files changed, 438 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index 03b3648..135bdc1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -28,7 +28,6 @@ import java.util.TreeMap;
 
 import javax.annotation.Nullable;
 
-import com.google.inject.Inject;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
@@ -47,6 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
+import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
 import com.google.inject.persist.Transactional;
@@ -100,7 +100,7 @@ public class Stage {
       @Assisted("commandParamsStage") String commandParamsStage,
       @Assisted("hostParamsStage") String hostParamsStage,
       HostRoleCommandFactory hostRoleCommandFactory) {
-    this.wrappersLoaded = true;
+    wrappersLoaded = true;
     this.requestId = requestId;
     this.logDir = logDir;
     this.clusterName = clusterName;
@@ -109,7 +109,7 @@ public class Stage {
     this.clusterHostInfo = clusterHostInfo;
     this.commandParamsStage = commandParamsStage;
     this.hostParamsStage = hostParamsStage;
-    this.skippable = false;
+    skippable = false;
     this.hostRoleCommandFactory = hostRoleCommandFactory;
   }
 
@@ -248,10 +248,10 @@ public class Stage {
     }
     //used on stage creation only, no need to check if wrappers loaded
     this.stageId = stageId;
-    for (String host: this.commandsToSend.keySet()) {
-      for (ExecutionCommandWrapper wrapper : this.commandsToSend.get(host)) {
+    for (String host: commandsToSend.keySet()) {
+      for (ExecutionCommandWrapper wrapper : commandsToSend.get(host)) {
         ExecutionCommand cmd = wrapper.getExecutionCommand();
-        cmd.setCommandId(StageUtils.getActionId(requestId, stageId));
+        cmd.setRequestAndStage(requestId, stageId);
       }
     }
   }
@@ -275,34 +275,34 @@ public class Stage {
     hrc.setExecutionCommandWrapper(wrapper);
     cmd.setHostname(hostName);
     cmd.setClusterName(clusterName);
-    cmd.setCommandId(this.getActionId());
+    cmd.setRequestAndStage(requestId, stageId);
     cmd.setRole(role.name());
     cmd.setRoleCommand(command);
 
     cmd.setServiceName("");
 
-    Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(hostName);
+    Map<String, HostRoleCommand> hrcMap = hostRoleCommands.get(hostName);
     if (hrcMap == null) {
       hrcMap = new LinkedHashMap<String, HostRoleCommand>();
-      this.hostRoleCommands.put(hostName, hrcMap);
+      hostRoleCommands.put(hostName, hrcMap);
     }
     if (hrcMap.get(role.toString()) != null) {
       throw new RuntimeException(
           "Setting the host role command second time for same stage: stage="
-              + this.getActionId() + ", host=" + hostName + ", role=" + role);
+              + getActionId() + ", host=" + hostName + ", role=" + role);
     }
     hrcMap.put(role.toString(), hrc);
-    List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(hostName);
+    List<ExecutionCommandWrapper> execCmdList = commandsToSend.get(hostName);
     if (execCmdList == null) {
       execCmdList = new ArrayList<ExecutionCommandWrapper>();
-      this.commandsToSend.put(hostName, execCmdList);
+      commandsToSend.put(hostName, execCmdList);
     }
 
     if (execCmdList.contains(wrapper)) {
       //todo: proper exception
       throw new RuntimeException(
           "Setting the execution command second time for same stage: stage="
-              + this.getActionId() + ", host=" + hostName + ", role=" + role+ ", event="+event);
+              + getActionId() + ", host=" + hostName + ", role=" + role+ ", event="+event);
     }
     execCmdList.add(wrapper);
     return wrapper;
@@ -458,7 +458,7 @@ public class Stage {
    */
   public synchronized List<String> getHosts() { // TODO: Check whether method should be synchronized
     List<String> hlist = new ArrayList<String>();
-    for (String h : this.hostRoleCommands.keySet()) {
+    for (String h : hostRoleCommands.keySet()) {
       hlist.add(h);
     }
     return hlist;
@@ -504,19 +504,19 @@ public class Stage {
   }
 
   public long getLastAttemptTime(String host, String role) {
-    return this.hostRoleCommands.get(host).get(role).getLastAttemptTime();
+    return hostRoleCommands.get(host).get(role).getLastAttemptTime();
   }
 
   public short getAttemptCount(String host, String role) {
-    return this.hostRoleCommands.get(host).get(role).getAttemptCount();
+    return hostRoleCommands.get(host).get(role).getAttemptCount();
   }
 
   public void incrementAttemptCount(String hostname, String role) {
-    this.hostRoleCommands.get(hostname).get(role).incrementAttemptCount();
+    hostRoleCommands.get(hostname).get(role).incrementAttemptCount();
   }
 
   public void setLastAttemptTime(String host, String role, long t) {
-    this.hostRoleCommands.get(host).get(role).setLastAttemptTime(t);
+    hostRoleCommands.get(host).get(role).setLastAttemptTime(t);
   }
 
   public ExecutionCommandWrapper getExecutionCommandWrapper(String hostname,
@@ -535,41 +535,41 @@ public class Stage {
   }
 
   public long getStartTime(String hostname, String role) {
-    return this.hostRoleCommands.get(hostname).get(role).getStartTime();
+    return hostRoleCommands.get(hostname).get(role).getStartTime();
   }
 
   public void setStartTime(String hostname, String role, long startTime) {
-    this.hostRoleCommands.get(hostname).get(role).setStartTime(startTime);
+    hostRoleCommands.get(hostname).get(role).setStartTime(startTime);
   }
 
   public HostRoleStatus getHostRoleStatus(String hostname, String role) {
-    return this.hostRoleCommands.get(hostname).get(role).getStatus();
+    return hostRoleCommands.get(hostname).get(role).getStatus();
   }
 
   public void setHostRoleStatus(String host, String role,
       HostRoleStatus status) {
-    this.hostRoleCommands.get(host).get(role).setStatus(status);
+    hostRoleCommands.get(host).get(role).setStatus(status);
   }
 
   public ServiceComponentHostEventWrapper getFsmEvent(String hostname, String roleStr) {
-    return this.hostRoleCommands.get(hostname).get(roleStr).getEvent();
+    return hostRoleCommands.get(hostname).get(roleStr).getEvent();
   }
 
 
   public void setExitCode(String hostname, String role, int exitCode) {
-    this.hostRoleCommands.get(hostname).get(role).setExitCode(exitCode);
+    hostRoleCommands.get(hostname).get(role).setExitCode(exitCode);
   }
 
   public int getExitCode(String hostname, String role) {
-    return this.hostRoleCommands.get(hostname).get(role).getExitCode();
+    return hostRoleCommands.get(hostname).get(role).getExitCode();
   }
 
   public void setStderr(String hostname, String role, String stdErr) {
-    this.hostRoleCommands.get(hostname).get(role).setStderr(stdErr);
+    hostRoleCommands.get(hostname).get(role).setStderr(stdErr);
   }
 
   public void setStdout(String hostname, String role, String stdOut) {
-    this.hostRoleCommands.get(hostname).get(role).setStdout(stdOut);
+    hostRoleCommands.get(hostname).get(role).setStdout(stdOut);
   }
 
   public synchronized boolean isStageInProgress() {
@@ -597,9 +597,10 @@ public class Stage {
         if (hrc == null) {
           return false;
         }
-        for (HostRoleStatus status : statuses)
-        if (hrc.getStatus().equals(status)) {
-          return true;
+        for (HostRoleStatus status : statuses) {
+          if (hrc.getStatus().equals(status)) {
+            return true;
+          }
         }
       }
     }
@@ -608,11 +609,11 @@ public class Stage {
 
   public Map<String, List<ExecutionCommandWrapper>> getExecutionCommands() {
     checkWrappersLoaded();
-    return this.commandsToSend;
+    return commandsToSend;
   }
 
   public String getLogDir() {
-    return this.logDir;
+    return logDir;
   }
 
   public Map<String, Map<String, HostRoleCommand>> getHostRoleCommands() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
index 6c254e8..e4abb1d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
@@ -17,7 +17,12 @@
  */
 package org.apache.ambari.server.agent;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.utils.StageUtils;
@@ -42,6 +47,12 @@ public class ExecutionCommand extends AgentCommand {
   @SerializedName("clusterName")
   private String clusterName;
 
+  @SerializedName("requestId")
+  private long requestId;
+
+  @SerializedName("stageId")
+  private long stageId;
+
   @SerializedName("taskId")
   private long taskId;
 
@@ -95,8 +106,20 @@ public class ExecutionCommand extends AgentCommand {
     return commandId;
   }
 
-  public void setCommandId(String commandId) {
-    this.commandId = commandId;
+  /**
+   * Sets the request and stage on this command. The {@code commandId} field is
+   * automatically constructed from these as requestId-stageId.
+   *
+   * @param requestId
+   *          the ID of the execution request.
+   * @param stageId
+   *          the ID of the stage request.
+   */
+  public void setRequestAndStage(long requestId, long stageId) {
+    this.requestId = requestId;
+    this.stageId = stageId;
+
+    commandId = StageUtils.getActionId(requestId, stageId);
   }
 
   @Override
@@ -267,7 +290,7 @@ public class ExecutionCommand extends AgentCommand {
    * @param  params for kerberos commands
    */
   public void setKerberosCommandParams(List<Map<String, String>> params) {
-    this.kerberosCommandParams =  params;
+    kerberosCommandParams =  params;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java
index 69a03f5..268832e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java
@@ -56,8 +56,23 @@ import com.google.inject.Provider;
 
 /**
  * The {@link ConfigureAction} is used to alter a configuration property during
- * an upgrade. It will only produce a new configuration if the value being
- * changed is different than the existing value.
+ * an upgrade. It will only produce a new configuration if an actual change is
+ * occuring. For some configure tasks, the value is already at the desired
+ * property or the conditions of the task are not met. In these cases, a new
+ * configuration will not be created. This task can perform any of the following
+ * actions in a single declaration:
+ * <ul>
+ * <li>Copy a configuration to a new property key, optionally setting a default
+ * if the original property did not exist</li>
+ * <li>Copy a configuration to a new property key from one configuration type to
+ * another, optionally setting a default if the original property did not exist</li>
+ * <li>Rename a configuration, optionally setting a default if the original
+ * property did not exist</li>
+ * <li>Delete a configuration property</li>
+ * <li>Set a configuration property</li>
+ * <li>Conditionally set a configuration property based on another configuration
+ * property value</li>
+ * </ul>
  */
 public class ConfigureAction extends AbstractServerAction {
 
@@ -274,7 +289,7 @@ public class ConfigureAction extends AbstractServerAction {
             newValues.put(transfer.toKey, valueToCopy);
 
             // append standard output
-            outputBuffer.append(MessageFormat.format("Created {0}/{1} = {2}\n", configType,
+            outputBuffer.append(MessageFormat.format("Created {0}/{1} = \"{2}\"\n", configType,
                 transfer.toKey, mask(transfer, valueToCopy)));
           }
           break;
@@ -294,7 +309,8 @@ public class ConfigureAction extends AbstractServerAction {
             changedValues = true;
 
             // append standard output
-            outputBuffer.append(MessageFormat.format("Created {0}/{1} with default value {2}\n",
+            outputBuffer.append(MessageFormat.format(
+                "Created {0}/{1} with default value \"{2}\"\n",
                 configType, transfer.toKey, mask(transfer, transfer.defaultValue)));
           }
 
@@ -369,8 +385,16 @@ public class ConfigureAction extends AbstractServerAction {
           // byproduct of the configure being able to take a list of transfers
           // without a key/value to set
           newValues.put(key, value);
-          outputBuffer.append(MessageFormat.format("{0}/{1} changed to {2}\n", configType, key,
-              mask(keyValuePair, value)));
+
+          final String message;
+          if (StringUtils.isEmpty(value)) {
+            message = MessageFormat.format("{0}/{1} changed to an empty value", configType, key);
+          } else {
+            message = MessageFormat.format("{0}/{1} changed to \"{2}\"\n", configType, key,
+                mask(keyValuePair, value));
+          }
+
+          outputBuffer.append(message);
         }
       }
     }
@@ -381,7 +405,7 @@ public class ConfigureAction extends AbstractServerAction {
         String toReplace = newValues.get(replacement.key);
 
         if (!toReplace.contains(replacement.find)) {
-          outputBuffer.append(MessageFormat.format("String {0} was not found in {1}/{2}\n",
+          outputBuffer.append(MessageFormat.format("String \"{0}\" was not found in {1}/{2}\n",
               replacement.find, configType, replacement.key));
         } else {
           String replaced = StringUtils.replace(toReplace, replacement.find, replacement.replaceWith);

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java
index f5a77c5..95bfb48 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java
@@ -126,9 +126,6 @@ public class ConfigureTask extends ServerSideActionTask {
   @XmlElement(name = "set")
   private List<ConfigurationKeyValue> keyValuePairs;
 
-  @XmlElement(name="summary")
-  public String summary;
-
   @XmlElement(name = "condition")
   private List<Condition> conditions;
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java
index fe933cc..2b1ba56 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java
@@ -47,10 +47,6 @@ public class ManualTask extends ServerSideActionTask {
   @XmlElement(name="message")
   public String message;
 
-  @XmlElement(name="summary")
-  public String summary;
-
-
   @Override
   public Task.Type getType() {
     return type;

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
index fbd837c..6416b57 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.state.stack.upgrade;
 
+import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlSeeAlso;
 
 
@@ -27,6 +28,12 @@ import javax.xml.bind.annotation.XmlSeeAlso;
 public abstract class Task {
 
   /**
+   * An optional brief description of what this task is doing.
+   */
+  @XmlElement(name = "summary")
+  public String summary;
+
+  /**
    * @return the type of the task
    */
   public abstract Type getType();

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
index 66612bd..aeca69b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
@@ -17,10 +17,27 @@
  */
 package org.apache.ambari.server.utils;
 
-import org.apache.commons.lang.StringUtils;
-import com.google.common.base.Joiner;
-import com.google.gson.Gson;
-import com.google.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import javax.xml.bind.JAXBException;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
@@ -36,6 +53,7 @@ import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
 import org.apache.ambari.server.topology.TopologyManager;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.codehaus.jackson.JsonGenerationException;
@@ -43,25 +61,9 @@ import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
 
-import javax.xml.bind.JAXBException;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import com.google.common.base.Joiner;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
 
 public class StageUtils {
 
@@ -201,7 +203,8 @@ public class StageUtils {
         new ServiceComponentHostInstallEvent("NAMENODE", hostname, now, "HDP-1.2.0"),
         "cluster1", "HDFS", false);
     ExecutionCommand execCmd = s.getExecutionCommandWrapper(hostname, "NAMENODE").getExecutionCommand();
-    execCmd.setCommandId(s.getActionId());
+
+    execCmd.setRequestAndStage(s.getRequestId(), s.getStageId());
     List<String> slaveHostList = new ArrayList<String>();
     slaveHostList.add(hostname);
     slaveHostList.add("host2");
@@ -245,7 +248,7 @@ public class StageUtils {
     InputStream is = new ByteArrayInputStream(json.getBytes(Charset.forName("UTF8")));
     return mapper.readValue(is, clazz);
   }
- 
+
   public static Map<String, String> getCommandParamsStage(ActionExecutionContext actionExecContext) throws AmbariException {
     return actionExecContext.getParameters() != null ? actionExecContext.getParameters() : new TreeMap<String, String>();
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml
index 2c7bbc4..5129d87 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml
@@ -58,7 +58,8 @@ export JAVA_HOME={{java64_home}}
 # Storm log folder
 export STORM_LOG_DIR={{log_dir}}
 
-# export STORM_CONF_DIR=""
+export STORM_CONF_DIR={{conf_dir}}
+export STORM_HOME={{storm_component_home_dir}}
     </value>
   </property>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py
index 66b46c8..93f3e05 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py
@@ -57,8 +57,10 @@ class NimbusDefault(Nimbus):
     env.set_params(params)
     if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
       conf_select.select(params.stack_name, "storm", params.version)
+      hdp_select.select("storm-client", params.version)
       hdp_select.select("storm-nimbus", params.version)
 
+
   def start(self, env, rolling_restart=False):
     import params
     env.set_params(params)
@@ -66,16 +68,19 @@ class NimbusDefault(Nimbus):
     setup_ranger_storm()    
     service("nimbus", action="start")
 
+
   def stop(self, env, rolling_restart=False):
     import params
     env.set_params(params)
     service("nimbus", action="stop")
 
+
   def status(self, env):
     import status_params
     env.set_params(status_params)
     check_process_status(status_params.pid_nimbus)
 
+
   def security_status(self, env):
     import status_params
     env.set_params(status_params)

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py
index 313bb17..f9d64f4 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py
@@ -49,6 +49,7 @@ class Nimbus(Script):
 
     if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
       conf_select.select(params.stack_name, "storm", params.version)
+      hdp_select.select("storm-client", params.version)
       hdp_select.select("storm-nimbus", params.version)
 
   def start(self, env, rolling_restart=False):

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py
index 90e3b7b..41ea1ac 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py
@@ -55,6 +55,7 @@ sudo = AMBARI_SUDO_BINARY
 stack_name = default("/hostLevelParams/stack_name", None)
 version = default("/commandParams/version", None)
 
+storm_component_home_dir = status_params.storm_component_home_dir
 conf_dir = status_params.conf_dir
 
 stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
@@ -81,6 +82,8 @@ user_group = config['configurations']['cluster-env']['user_group']
 java64_home = config['hostLevelParams']['java_home']
 jps_binary = format("{java64_home}/bin/jps")
 nimbus_port = config['configurations']['storm-site']['nimbus.thrift.port']
+storm_zookeeper_root_dir = default('/configurations/storm-site/storm.zookeeper.root', None)
+storm_zookeeper_servers = default('/configurations/storm-site/storm.zookeeper.servers', None)
 
 # nimbus.seeds is supported in HDP 2.3.0.0 and higher
 nimbus_seeds_supported = default('/configurations/storm-env/nimbus_seeds_supported', False)
@@ -93,8 +96,7 @@ rest_api_conf_file = format("{conf_dir}/config.yaml")
 storm_env_sh_template = config['configurations']['storm-env']['content']
 jmxremote_port = config['configurations']['storm-env']['jmxremote_port']
 
-if 'ganglia_server_host' in config['clusterHostInfo'] and \
-                len(config['clusterHostInfo']['ganglia_server_host'])>0:
+if 'ganglia_server_host' in config['clusterHostInfo'] and len(config['clusterHostInfo']['ganglia_server_host'])>0:
   ganglia_installed = True
   ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
   ganglia_report_interval = 60
@@ -135,6 +137,7 @@ if has_metric_collector:
   metric_collector_host = ams_collector_hosts[0]
   metric_collector_report_interval = 60
   metric_collector_app_id = "nimbus"
+
 metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink*.jar"
 
 # ranger host

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py
index 901aecc..0080beb 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py
@@ -18,21 +18,19 @@ limitations under the License.
 
 """
 
-
 from resource_management.core.resources import Execute
 from resource_management.core.resources import File
 from resource_management.libraries.functions.format import format
 import time
 
 
-def service(
-    name,
-    action='start'):
+def service(name, action = 'start'):
   import params
   import status_params
 
   pid_file = status_params.pid_files[name]
-  no_op_test = format("ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1")
+  no_op_test = format(
+    "ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1")
 
   if name == "logviewer" or name == "drpc":
     tries_count = 12
@@ -45,43 +43,45 @@ def service(
     process_grep = format("{rest_lib_dir}/storm-rest-.*\.jar$")
   else:
     process_grep = format("storm.daemon.{name}$")
-    
+
   find_proc = format("{jps_binary} -l  | grep {process_grep}")
   write_pid = format("{find_proc} | awk {{'print $1'}} > {pid_file}")
   crt_pid_cmd = format("{find_proc} && {write_pid}")
-  storm_env = format("source {conf_dir}/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH")
+  storm_env = format(
+    "source {conf_dir}/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH")
 
   if action == "start":
     if name == "rest_api":
-      process_cmd = format("{storm_env} ; java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server")
-      cmd = format("{process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log 2>&1")
+      process_cmd = format(
+        "{storm_env} ; java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server")
+      cmd = format(
+        "{process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log 2>&1")
     else:
       cmd = format("{storm_env} ; storm {name} > {log_dir}/{name}.out 2>&1")
 
     Execute(cmd,
-           not_if=no_op_test,
-           user=params.storm_user,
-           wait_for_finish=False,
-           path=params.storm_bin_dir
-    )
+      not_if = no_op_test,
+      user = params.storm_user,
+      wait_for_finish = False,
+      path = params.storm_bin_dir)
+
     Execute(crt_pid_cmd,
-            user=params.storm_user,
-            logoutput=True,
-            tries=tries_count,
-            try_sleep=10,
-            path=params.storm_bin_dir
-    )
+      user = params.storm_user,
+      logoutput = True,
+      tries = tries_count,
+      try_sleep = 10,
+      path = params.storm_bin_dir)
 
   elif action == "stop":
     process_dont_exist = format("! ({no_op_test})")
     pid = format("`cat {pid_file}`")
+
     Execute(format("{sudo} kill {pid}"),
-            not_if=process_dont_exist
-    )
+      not_if = process_dont_exist)
+
     Execute(format("{sudo} kill -9 {pid}"),
-            not_if=format("sleep 2; {process_dont_exist} || sleep 20; {process_dont_exist}"),
-            ignore_failures=True
-    )
-    File(pid_file,
-         action = "delete",
-    )
+      not_if = format(
+        "sleep 2; {process_dont_exist} || sleep 20; {process_dont_exist}"),
+      ignore_failures = True)
+
+    File(pid_file, action = "delete")

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py
index 99397ac..49dee47 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py
@@ -64,8 +64,10 @@ else:
   kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
   tmp_dir = Script.get_tmp_dir()
 
+  storm_component_home_dir = "/usr/lib/storm"
   conf_dir = "/etc/storm/conf"
   if Script.is_hdp_stack_greater_or_equal("2.2"):
+    storm_component_home_dir = format("/usr/hdp/current/{component_directory}")
     conf_dir = format("/usr/hdp/current/{component_directory}/conf")
 
   storm_user = config['configurations']['storm-env']['storm_user']

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py
new file mode 100644
index 0000000..b25cdf8
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py
@@ -0,0 +1,133 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import json
+import os
+
+from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+from resource_management.core.resources.system import Directory
+from resource_management.core.resources.system import File
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions.default import default
+
+class StormUpgrade(Script):
+  """
+  This class helps perform some of the upgrade tasks needed for Storm during
+  a non-rolling upgrade. Storm writes data to disk locally and to ZooKeeper.
+  If any HDP 2.2 bits exist in these directories when an HDP 2.3 instance
+  starts up, it will fail to start properly. Because the upgrade framework in
+  Ambari doesn't yet have a mechanism to say "stop all" before starting to
+  upgrade each component, we need to rely on a Storm trick to bring down
+  running daemons. By removing the ZooKeeper data with running daemons, those
+  daemons will die.
+  """
+
+  def delete_storm_zookeeper_data(self, env):
+    """
+    Deletes the Storm data from ZooKeeper, effectively bringing down all
+    Storm daemons.
+    :return:
+    """
+    import params
+
+    Logger.info('Clearing Storm data from ZooKeeper')
+
+    storm_zookeeper_root_dir = params.storm_zookeeper_root_dir
+    if storm_zookeeper_root_dir is None:
+      raise Fail("The storm ZooKeeper directory specified by storm-site/storm.zookeeper.root must be specified")
+
+    # create the ZooKeeper delete command
+    if params.version is not None:
+      command = "/usr/hdp/{0}/zookeeper/bin/zkCli.sh rmr /storm".format(params.version)
+    else:
+      command = "/usr/hdp/current/zookeeper-client/bin/zkCli.sh rmr /storm"
+
+    if params.security_enabled:
+      kinit_command=format("{kinit_path_local} -kt {smoke_user_keytab} {smokeuser_principal}; ")
+      Execute(kinit_command,user=params.smokeuser)
+
+    # clean out ZK
+    Execute(command, user=params.storm_user, tries=1)
+
+
+  def delete_storm_local_data(self, env):
+    """
+    Deletes Storm data from local directories. This will create a marker file
+    with JSON data representing the upgrade stack and request/stage ID. This
+    will prevent multiple Storm components on the same host from removing
+    the local directories more than once.
+    :return:
+    """
+    import params
+
+    Logger.info('Clearing Storm data from local directories...')
+
+    storm_local_directory = params.local_dir
+    if storm_local_directory is None:
+      raise Fail("The storm local directory specified by storm-site/storm.local.dir must be specified")
+
+    request_id = default("/requestId", None)
+    stage_id = default("/stageId", None)
+    stack_version = params.version
+    stack_name = params.stack_name
+
+    json_map = {}
+    json_map["requestId"] = request_id
+    json_map["stageId"] = stage_id
+    json_map["stackVersion"] = stack_version
+    json_map["stackName"] = stack_name
+
+    temp_directory = params.tmp_dir
+    upgrade_file = os.path.join(temp_directory, "storm-upgrade-{0}.json".format(stack_version))
+
+    if os.path.exists(upgrade_file):
+      try:
+        with open(upgrade_file) as file_pointer:
+          existing_json_map = json.load(file_pointer)
+
+        if cmp(json_map, existing_json_map) == 0:
+          Logger.info("The storm upgrade has already removed the local directories for {0}-{1} for request {2} and stage {3}".format(
+            stack_name, stack_version, request_id, stage_id))
+
+          # nothing else to do here for this as it appears to have already been
+          # removed by another component being upgraded
+          return
+
+      except:
+        Logger.error("The upgrade file {0} appears to be corrupt; removing...".format(upgrade_file))
+        File(upgrade_file, action="delete")
+    else:
+      # delete the upgrade file since it does not match
+      File(upgrade_file, action="delete")
+
+    # delete from local directory
+    Directory(storm_local_directory, action="delete", recursive=True)
+
+    # recreate storm local directory
+    Directory(storm_local_directory, mode=0755, owner = params.storm_user,
+      group = params.user_group, recursive = True)
+
+    # the file doesn't exist, so create it
+    with open(upgrade_file, 'w') as file_pointer:
+      json.dump(json_map, file_pointer, indent=2)
+
+if __name__ == "__main__":
+  StormUpgrade().execute()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py
index bdb03df..335aeeb 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py
@@ -75,6 +75,7 @@ class SupervisorDefault(Supervisor):
 
     if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
       conf_select.select(params.stack_name, "storm", params.version)
+      hdp_select.select("storm-client", params.version)
       hdp_select.select("storm-supervisor", params.version)
 
   def start(self, env, rolling_restart=False):

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py
index bde533f..f3074f1 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py
@@ -50,6 +50,7 @@ class Supervisor(Script):
 
     if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
       conf_select.select(params.stack_name, "storm", params.version)
+      hdp_select.select("storm-client", params.version)
       hdp_select.select("storm-supervisor", params.version)
 
   def start(self, env, rolling_restart=False):

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml
index 1aef735..aa52d3b 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml
@@ -33,8 +33,8 @@
 # The java implementation to use.
 export JAVA_HOME={{java64_home}}
 
-# export STORM_CONF_DIR=""
-export STORM_HOME=/usr/hdp/current/storm-client
+export STORM_CONF_DIR={{conf_dir}}
+export STORM_HOME={{storm_component_home_dir}}
     </value>
   </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml
index 05aa89f..1cbdd88 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml
@@ -178,6 +178,7 @@
       <service name="ZOOKEEPER">
         <component>ZOOKEEPER_CLIENT</component>
       </service>
+
       <service name="HDFS">
         <component>HDFS_CLIENT</component>
       </service>
@@ -285,6 +286,7 @@
           <function>finalize_rolling_upgrade</function>
         </task>
       </execute-stage>
+
       <execute-stage title="Save Cluster State" service="" component="">
         <task xsi:type="server_action" class="org.apache.ambari.server.serveraction.upgrades.FinalizeUpgradeAction">
         </task>
@@ -331,6 +333,7 @@
               <value>org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer</value>
             </condition>
           </task>
+
           <task xsi:type="configure" summary="Transitioning Ranger HDFS Policy">
             <type>ranger-hdfs-policymgr-ssl</type>
             <transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="SSL_KEYSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.keystore" default-value="/usr/hdp/current/hadoop-client/conf/ranger-plugin-keystore.jks" />
@@ -338,6 +341,7 @@
             <transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="SSL_TRUSTSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.truststore" default-value="/usr/hdp/current/hadoop-client/conf/ranger-plugin-truststore.jks" />
             <transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="SSL_TRUSTSTORE_PASSWORD" to-key="xasecure.policymgr.clientssl.truststore.password" mask="true" default-value="changeit" />
           </task>
+
           <task xsi:type="configure" summary="Transitioning Ranger HDFS Audit">
             <type>ranger-hdfs-audit</type>
             <transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="XAAUDIT.DB.IS_ENABLED" to-key="xasecure.audit.destination.db" default-value="false"/>
@@ -387,6 +391,7 @@
             <transfer operation="delete" delete-key="POLICY_MGR_URL" />
           </task>
         </pre-upgrade>
+
         <upgrade>
           <task xsi:type="restart" />
         </upgrade>
@@ -556,6 +561,7 @@
             <transfer operation="delete" delete-key="XAAUDIT.DB.PASSWORD" />
           </task>
         </pre-upgrade>
+
         <upgrade>
           <task xsi:type="restart" />
         </upgrade>
@@ -580,9 +586,6 @@
           <task xsi:type="configure">
             <type>tez-site</type>
             <set key="tez.am.view-acls" value="*"/>
-          </task>
-          <task xsi:type="configure">
-            <type>tez-site</type>
             <set key="tez.task.generate.counters.per.io" value="true"/>
           </task>
         </pre-upgrade>
@@ -899,17 +902,33 @@
             <message>Before continuing, please deactivate and kill any currently running topologies.</message>
           </task>
 
+          <task xsi:type="execute" summary="Removing Storm data from ZooKeeper">
+            <script>scripts/storm_upgrade.py</script>
+            <function>delete_storm_zookeeper_data</function>
+          </task>
+
+          <task xsi:type="execute" summary="Removing local Storm data">
+            <script>scripts/storm_upgrade.py</script>
+            <function>delete_storm_local_data</function>
+          </task>
+
           <task xsi:type="configure" summary="Converting nimbus.host into nimbus.seeds">
             <type>storm-site</type>
-            <transfer operation="copy" from-key="nimbus.host" to-key="nimbus.seeds" coerce-to="yaml-array" />
-            <transfer operation="delete" delete-key="nimbus.host" />
+            <transfer operation="copy" from-key="nimbus.host" to-key="nimbus.seeds" coerce-to="yaml-array"/>
+            <transfer operation="delete" delete-key="nimbus.host"/>
+          </task>
+
+          <task xsi:type="configure" summary="Updating Storm home and configuration environment variables">
+            <type>storm-env</type>
+            <replace key="content" find="# export STORM_CONF_DIR=&quot;&quot;" replace-with="export STORM_CONF_DIR={{conf_dir}}"/>
+            <replace key="content" find="export STORM_HOME=/usr/hdp/current/storm-client" replace-with="export STORM_HOME={{storm_component_home_dir}}"/>
           </task>
 
           <task xsi:type="configure" summary="Configuring Ranger Storm Policy">
             <type>ranger-storm-policymgr-ssl</type>
-            <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_KEYSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.keystore" default-value="/usr/hdp/current/storm-nimbus/conf/ranger-plugin-keystore.jks"/>
+            <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_KEYSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.keystore" default-value="/usr/hdp/current/storm-client/conf/ranger-plugin-keystore.jks"/>
             <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_KEYSTORE_PASSWORD" to-key="xasecure.policymgr.clientssl.keystore.password" default-value="myKeyFilePassword" mask="true"/>
-            <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_TRUSTSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.truststore" default-value="/usr/hdp/current/storm-nimbus/conf/ranger-plugin-truststore.jks"/>
+            <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_TRUSTSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.truststore" default-value="/usr/hdp/current/storm-client/conf/ranger-plugin-truststore.jks"/>
             <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_TRUSTSTORE_PASSWORD" to-key="xasecure.policymgr.clientssl.truststore.password" default-value="changeit" mask="true"/>
           </task>
 
@@ -959,25 +978,56 @@
           <task xsi:type="restart" />
         </upgrade>
       </component>
+
       <component name="STORM_REST_API">
+        <pre-upgrade>
+          <task xsi:type="execute" summary="Removing local Storm data">
+            <script>scripts/storm_upgrade.py</script>
+            <function>delete_storm_local_data</function>
+          </task>
+        </pre-upgrade>
         <upgrade>
           <task xsi:type="restart" />
         </upgrade>
       </component>
+
       <component name="SUPERVISOR">
+        <pre-upgrade>
+          <task xsi:type="execute" summary="Removing local Storm data">
+            <script>scripts/storm_upgrade.py</script>
+            <function>delete_storm_local_data</function>
+          </task>
+        </pre-upgrade>
         <upgrade>
           <task xsi:type="restart" />
         </upgrade>
       </component>
+
       <component name="STORM_UI_SERVER">
+        <pre-upgrade>
+          <task xsi:type="execute" summary="Removing local Storm data">
+            <script>scripts/storm_upgrade.py</script>
+            <function>delete_storm_local_data</function>
+          </task>
+        </pre-upgrade>
+
         <upgrade>
           <task xsi:type="restart" />
         </upgrade>
       </component>
+
       <component name="DRPC_SERVER">
+        <pre-upgrade>
+          <task xsi:type="execute" summary="Removing local Storm data">
+            <script>scripts/storm_upgrade.py</script>
+            <function>delete_storm_local_data</function>
+          </task>
+        </pre-upgrade>
+
         <upgrade>
           <task xsi:type="restart" />
         </upgrade>
+
         <post-upgrade>
           <task xsi:type="manual">
             <message>Please rebuild your topology using the new Storm version dependencies and resubmit it using the newly created jar.</message>

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml
index 4600a14..855e6fd 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml
@@ -22,7 +22,7 @@
   
   <property>
     <name>xasecure.policymgr.clientssl.keystore</name>
-    <value>/usr/hdp/current/storm-nimbus/conf/ranger-plugin-keystore.jks</value>
+    <value>/usr/hdp/current/storm-client/conf/ranger-plugin-keystore.jks</value>
     <description>Java Keystore files</description>
   </property>
 
@@ -34,7 +34,7 @@
 
   <property>
     <name>xasecure.policymgr.clientssl.truststore</name>
-    <value>/usr/hdp/current/storm-nimbus/conf/ranger-plugin-truststore.jks</value>
+    <value>/usr/hdp/current/storm-client/conf/ranger-plugin-truststore.jks</value>
     <description>java truststore file</description>
   </property>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml
index 33567c8..f26be4d 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml
@@ -40,7 +40,7 @@
 
   <property>
     <name>ranger.plugin.storm.policy.rest.ssl.config.file</name>
-    <value>/usr/hdp/current/storm-nimbus/conf/ranger-policymgr-ssl.xml</value>
+    <value>/usr/hdp/current/storm-client/conf/ranger-policymgr-ssl.xml</value>
     <description>Path to the file containing SSL details to contact Ranger Admin</description>
   </property>
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
index 66efea1..8d21b80 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
@@ -52,7 +52,7 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 
 public class ExecutionCommandWrapperTest {
-  
+
   private static final String HOST1 = "dev01.ambari.apache.org";
   private static final String CLUSTER1 = "c1";
   private static final String CLUSTER_VERSION_TAG = "clusterVersion";
@@ -87,7 +87,7 @@ public class ExecutionCommandWrapperTest {
   private static Map<String, String> SERVICE_SITE_SERVICE;
   private static Map<String, String> SERVICE_SITE_HOST;
   private static Map<String, Map<String, String>> CONFIG_ATTRIBUTES;
-  
+
   private static Injector injector;
   private static Clusters clusters;
   private static ConfigFactory configFactory;
@@ -101,35 +101,35 @@ public class ExecutionCommandWrapperTest {
     configHelper = injector.getInstance(ConfigHelper.class);
     configFactory = injector.getInstance(ConfigFactory.class);
     stageFactory = injector.getInstance(StageFactory.class);
-    
+
     clusters = injector.getInstance(Clusters.class);
     clusters.addHost(HOST1);
     clusters.getHost(HOST1).persist();
     clusters.addCluster(CLUSTER1, new StackId("HDP-0.1"));
-    
+
     Cluster cluster1 = clusters.getCluster(CLUSTER1);
-    
+
     SERVICE_SITE_CLUSTER = new HashMap<String, String>();
     SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME1, SERVICE_SITE_VAL1);
     SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2);
     SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME3, SERVICE_SITE_VAL3);
     SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME4, SERVICE_SITE_VAL4);
-    
+
     SERVICE_SITE_SERVICE = new HashMap<String, String>();
     SERVICE_SITE_SERVICE.put(SERVICE_SITE_NAME1, SERVICE_SITE_VAL1_S);
     SERVICE_SITE_SERVICE.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2_S);
     SERVICE_SITE_SERVICE.put(SERVICE_SITE_NAME5, SERVICE_SITE_VAL5_S);
-    
+
     SERVICE_SITE_HOST = new HashMap<String, String>();
     SERVICE_SITE_HOST.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2_H);
     SERVICE_SITE_HOST.put(SERVICE_SITE_NAME6, SERVICE_SITE_VAL6_H);
-    
+
     GLOBAL_CLUSTER = new HashMap<String, String>();
     GLOBAL_CLUSTER.put(GLOBAL_NAME1, GLOBAL_CLUSTER_VAL1);
     GLOBAL_CLUSTER.put(GLOBAL_NAME2, GLOBAL_CLUSTER_VAL2);
-    
+
     CONFIG_ATTRIBUTES = new HashMap<String, Map<String,String>>();
-    
+
     //Cluster level global config
     Config globalConfig = configFactory.createNew(cluster1, GLOBAL_CONFIG, GLOBAL_CLUSTER, CONFIG_ATTRIBUTES);
     globalConfig.setTag(CLUSTER_VERSION_TAG);
@@ -139,25 +139,25 @@ public class ExecutionCommandWrapperTest {
     Config serviceSiteConfigCluster = configFactory.createNew(cluster1, SERVICE_SITE_CONFIG, SERVICE_SITE_CLUSTER, CONFIG_ATTRIBUTES);
     serviceSiteConfigCluster.setTag(CLUSTER_VERSION_TAG);
     cluster1.addConfig(serviceSiteConfigCluster);
-    
+
     //Service level service config
     Config serviceSiteConfigService = configFactory.createNew(cluster1, SERVICE_SITE_CONFIG, SERVICE_SITE_SERVICE, CONFIG_ATTRIBUTES);
     serviceSiteConfigService.setTag(SERVICE_VERSION_TAG);
     cluster1.addConfig(serviceSiteConfigService);
-    
+
     //Host level service config
     Config serviceSiteConfigHost = configFactory.createNew(cluster1, SERVICE_SITE_CONFIG, SERVICE_SITE_HOST, CONFIG_ATTRIBUTES);
     serviceSiteConfigHost.setTag(HOST_VERSION_TAG);
     cluster1.addConfig(serviceSiteConfigHost);
-    
+
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
-    
+
     createTask(db, 1, 1, HOST1, CLUSTER1);
-    
+
   }
-  
+
   private static void createTask(ActionDBAccessor db, long requestId, long stageId, String hostName, String clusterName) throws AmbariException {
-    
+
     Stage s = stageFactory.createNew(requestId, "/var/log", clusterName, 1L, "execution command wrapper test", "clusterHostInfo", "commandParamsStage", "hostParamsStage");
     s.setStageId(stageId);
     s.addHostRoleExecutionCommand(hostName, Role.NAMENODE,
@@ -169,36 +169,36 @@ public class ExecutionCommandWrapperTest {
     Request request = new Request(stages, clusters);
     db.persistActions(request);
   }
-  
+
   @Test
   public void testGetExecutionCommand() throws JSONException, AmbariException {
-    
-        
+
+
     Map<String, Map<String, String>> confs = new HashMap<String, Map<String, String>>();
     Map<String, String> configurationsGlobal = new HashMap<String, String>();
     configurationsGlobal.put(GLOBAL_NAME1, GLOBAL_VAL1);
     confs.put(GLOBAL_CONFIG, configurationsGlobal);
-    
+
     Map<String, Map<String, String>> confTags = new HashMap<String, Map<String, String>>();
     Map<String, String> confTagServiceSite = new HashMap<String, String>();
-    
+
     confTagServiceSite.put("tag", CLUSTER_VERSION_TAG);
     confTagServiceSite.put("service_override_tag", SERVICE_VERSION_TAG);
     confTagServiceSite.put("host_override_tag", HOST_VERSION_TAG);
-    
+
     confTags.put(SERVICE_SITE_CONFIG, confTagServiceSite);
-    
+
     Map<String, String> confTagGlobal = Collections.singletonMap("tag", CLUSTER_VERSION_TAG);
-    
+
     confTags.put(GLOBAL_CONFIG, confTagGlobal);
-    
-    
+
+
     ExecutionCommand executionCommand = new ExecutionCommand();
-    
-    
+
+
     executionCommand.setClusterName(CLUSTER1);
     executionCommand.setTaskId(1);
-    executionCommand.setCommandId("1-1");
+    executionCommand.setRequestAndStage(1, 1);
     executionCommand.setHostname(HOST1);
     executionCommand.setRole("NAMENODE");
     executionCommand.setRoleParams(Collections.<String, String>emptyMap());
@@ -208,63 +208,63 @@ public class ExecutionCommandWrapperTest {
     executionCommand.setServiceName("HDFS");
     executionCommand.setCommandType(AgentCommandType.EXECUTION_COMMAND);
     executionCommand.setCommandParams(Collections.<String, String>emptyMap());
-    
+
     String json = StageUtils.getGson().toJson(executionCommand, ExecutionCommand.class);
 
     ExecutionCommandWrapper execCommWrap = new ExecutionCommandWrapper(json);
     ExecutionCommand processedExecutionCommand = execCommWrap.getExecutionCommand();
-        
+
     Map<String, String> serviceSiteConfig = processedExecutionCommand.getConfigurations().get(SERVICE_SITE_CONFIG);
-    
+
     Assert.assertEquals(SERVICE_SITE_VAL1_S, serviceSiteConfig.get(SERVICE_SITE_NAME1));
     Assert.assertEquals(SERVICE_SITE_VAL2_H, serviceSiteConfig.get(SERVICE_SITE_NAME2));
     Assert.assertEquals(SERVICE_SITE_VAL3, serviceSiteConfig.get(SERVICE_SITE_NAME3));
     Assert.assertEquals(SERVICE_SITE_VAL4, serviceSiteConfig.get(SERVICE_SITE_NAME4));
     Assert.assertEquals(SERVICE_SITE_VAL5_S, serviceSiteConfig.get(SERVICE_SITE_NAME5));
     Assert.assertEquals(SERVICE_SITE_VAL6_H, serviceSiteConfig.get(SERVICE_SITE_NAME6));
-    
+
     Map<String, String> globalConfig = processedExecutionCommand.getConfigurations().get(GLOBAL_CONFIG);
-    
+
     Assert.assertEquals(GLOBAL_VAL1, globalConfig.get(GLOBAL_NAME1));
     Assert.assertEquals(GLOBAL_CLUSTER_VAL2, globalConfig.get(GLOBAL_NAME2));
-    
+
 
     //Union of all keys of service site configs
     Set<String> serviceSiteKeys = new HashSet<String>();
     serviceSiteKeys.addAll(SERVICE_SITE_CLUSTER.keySet());
     serviceSiteKeys.addAll(SERVICE_SITE_SERVICE.keySet());
     serviceSiteKeys.addAll(SERVICE_SITE_HOST.keySet());
-    
+
     Assert.assertEquals(serviceSiteKeys.size(), serviceSiteConfig.size());
-    
+
   }
-  
+
   @Test
   public void testGetMergedConfig() {
     Map<String, String> baseConfig = new HashMap<String, String>();
-    
+
     baseConfig.put(SERVICE_SITE_NAME1, SERVICE_SITE_VAL1);
     baseConfig.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2);
     baseConfig.put(SERVICE_SITE_NAME3, SERVICE_SITE_VAL3);
     baseConfig.put(SERVICE_SITE_NAME4, SERVICE_SITE_VAL4);
     baseConfig.put(SERVICE_SITE_NAME5, SERVICE_SITE_VAL5);
-    
+
     Map<String, String> overrideConfig = new HashMap<String, String>();
-    
+
     overrideConfig.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2_H);
     overrideConfig.put(SERVICE_SITE_NAME6, SERVICE_SITE_VAL6_H);
-    
-    
+
+
     Map<String, String> mergedConfig = configHelper.getMergedConfig(baseConfig,
       overrideConfig);
-    
-    
+
+
     Set<String> configsKeys = new HashSet<String>();
     configsKeys.addAll(baseConfig.keySet());
     configsKeys.addAll(overrideConfig.keySet());
-    
+
     Assert.assertEquals(configsKeys.size(), mergedConfig.size());
-    
+
     Assert.assertEquals(SERVICE_SITE_VAL1, mergedConfig.get(SERVICE_SITE_NAME1));
     Assert.assertEquals(SERVICE_SITE_VAL2_H, mergedConfig.get(SERVICE_SITE_NAME2));
     Assert.assertEquals(SERVICE_SITE_VAL3, mergedConfig.get(SERVICE_SITE_NAME3));

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index ca1a5a0..15d7904 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -84,8 +84,17 @@ import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
-import org.apache.ambari.server.orm.dao.*;
-import org.apache.ambari.server.orm.entities.*;
+import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.dao.ResourceTypeDAO;
+import org.apache.ambari.server.orm.dao.StackDAO;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.orm.entities.ResourceEntity;
+import org.apache.ambari.server.orm.entities.ResourceTypeEntity;
+import org.apache.ambari.server.orm.entities.StackEntity;
 import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileWriter;
 import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileWriterFactory;
 import org.apache.ambari.server.serveraction.kerberos.KerberosServerAction;
@@ -229,7 +238,7 @@ public class TestHeartbeatHandler {
     hostObject.setState(HostState.UNHEALTHY);
 
     ExecutionCommand execCmd = new ExecutionCommand();
-    execCmd.setCommandId("2-34");
+    execCmd.setRequestAndStage(2, 34);
     execCmd.setHostname(DummyHostname1);
     aq.enqueue(DummyHostname1, new ExecutionCommand());
     HeartBeat hb = new HeartBeat();
@@ -2318,11 +2327,11 @@ public class TestHeartbeatHandler {
     clusterEntity.setDesiredStack(stackEntity);
 
     clusterDAO.create(clusterEntity);
-    
+
     StackId stackId = new StackId(DummyStackId);
-    
+
     Cluster cluster = clusters.getCluster(DummyCluster);
-    
+
     cluster.setDesiredStackVersion(stackId);
     cluster.setCurrentStackVersion(stackId);
     helper.getOrCreateRepositoryVersion(stackId, stackId.getStackVersion());
@@ -2520,7 +2529,7 @@ public class TestHeartbeatHandler {
     hostObject.setState(HostState.UNHEALTHY);
 
     ExecutionCommand execCmd = new ExecutionCommand();
-    execCmd.setCommandId("2-34");
+    execCmd.setRequestAndStage(2, 34);
     execCmd.setHostname(DummyHostname1);
     aq.enqueue(DummyHostname1, new ExecutionCommand());
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
index f79fa6d..4548b8d 100644
--- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
+++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
@@ -149,6 +149,7 @@ class TestStormNimbus(TestStormBase):
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
 
+    self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067")
     self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.2.1.0-2067")
 
   def test_pre_rolling_restart_23(self):
@@ -168,6 +169,7 @@ class TestStormNimbus(TestStormBase):
                      call_mocks = [(0, None), (0, None)],
                      mocks_dict = mocks_dict)
 
+    self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234")
     self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.3.0.0-1234")
 
     self.assertEquals(2, mocks_dict['call'].call_count)

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py
index 17c0c6b..85a4a2c 100644
--- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py
+++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py
@@ -114,6 +114,7 @@ class TestStormNimbus(TestStormBase):
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
 
+    self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067")
     self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.2.1.0-2067")
 
   def test_pre_rolling_restart_23(self):
@@ -133,6 +134,7 @@ class TestStormNimbus(TestStormBase):
                      call_mocks = [(0, None), (0, None)],
                      mocks_dict = mocks_dict)
 
+    self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234")
     self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.3.0.0-1234")
 
     self.assertEquals(2, mocks_dict['call'].call_count)

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py
index 38b7b64..c4a261e 100644
--- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py
+++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py
@@ -194,6 +194,7 @@ class TestStormSupervisor(TestStormBase):
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
 
+    self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067")
     self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.2.1.0-2067")
 
   def test_pre_rolling_restart_23(self):
@@ -213,6 +214,7 @@ class TestStormSupervisor(TestStormBase):
                      call_mocks = [(0, None), (0, None)],
                      mocks_dict = mocks_dict)
 
+    self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234")
     self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.3.0.0-1234")
 
     self.assertEquals(2, mocks_dict['call'].call_count)

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py
index a6dc423..926e57e 100644
--- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py
+++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py
@@ -163,6 +163,7 @@ class TestStormSupervisor(TestStormBase):
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
 
+    self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067")
     self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.2.1.0-2067")
 
   def test_pre_rolling_restart_23(self):
@@ -182,6 +183,7 @@ class TestStormSupervisor(TestStormBase):
                      call_mocks = [(0, None), (0, None)],
                      mocks_dict = mocks_dict)
 
+    self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234")
     self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.3.0.0-1234")
 
     self.assertEquals(2, mocks_dict['call'].call_count)