You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2015/05/25 13:23:08 UTC
ambari git commit: AMBARI-11154 - Storm Upgrade Pack For HDP-2.2 to
HDP-2.3 (jonathanhurley)
Repository: ambari
Updated Branches:
refs/heads/trunk 8690cfdb4 -> ad2f5442b
AMBARI-11154 - Storm Upgrade Pack For HDP-2.2 to HDP-2.3 (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ad2f5442
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ad2f5442
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ad2f5442
Branch: refs/heads/trunk
Commit: ad2f5442b3706235969c8982d3fa92fea808dc29
Parents: 8690cfd
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Wed May 20 17:00:31 2015 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Mon May 25 07:23:01 2015 -0400
----------------------------------------------------------------------
.../ambari/server/actionmanager/Stage.java | 65 ++++-----
.../ambari/server/agent/ExecutionCommand.java | 31 ++++-
.../serveraction/upgrades/ConfigureAction.java | 38 +++++-
.../state/stack/upgrade/ConfigureTask.java | 3 -
.../server/state/stack/upgrade/ManualTask.java | 4 -
.../ambari/server/state/stack/upgrade/Task.java | 7 +
.../apache/ambari/server/utils/StageUtils.java | 53 ++++----
.../STORM/0.9.1.2.1/configuration/storm-env.xml | 3 +-
.../STORM/0.9.1.2.1/package/scripts/nimbus.py | 5 +
.../0.9.1.2.1/package/scripts/nimbus_prod.py | 1 +
.../0.9.1.2.1/package/scripts/params_linux.py | 7 +-
.../STORM/0.9.1.2.1/package/scripts/service.py | 56 ++++----
.../0.9.1.2.1/package/scripts/status_params.py | 2 +
.../0.9.1.2.1/package/scripts/storm_upgrade.py | 133 +++++++++++++++++++
.../0.9.1.2.1/package/scripts/supervisor.py | 1 +
.../package/scripts/supervisor_prod.py | 1 +
.../services/STORM/configuration/storm-env.xml | 4 +-
.../stacks/HDP/2.2/upgrades/upgrade-2.3.xml | 64 ++++++++-
.../ranger-storm-policymgr-ssl.xml | 4 +-
.../configuration/ranger-storm-security.xml | 2 +-
.../ExecutionCommandWrapperTest.java | 96 ++++++-------
.../server/agent/TestHeartbeatHandler.java | 23 +++-
.../stacks/2.1/STORM/test_storm_nimbus.py | 2 +
.../stacks/2.1/STORM/test_storm_nimbus_prod.py | 2 +
.../stacks/2.1/STORM/test_storm_supervisor.py | 2 +
.../2.1/STORM/test_storm_supervisor_prod.py | 2 +
26 files changed, 438 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index 03b3648..135bdc1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -28,7 +28,6 @@ import java.util.TreeMap;
import javax.annotation.Nullable;
-import com.google.inject.Inject;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
@@ -47,6 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
+import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.google.inject.persist.Transactional;
@@ -100,7 +100,7 @@ public class Stage {
@Assisted("commandParamsStage") String commandParamsStage,
@Assisted("hostParamsStage") String hostParamsStage,
HostRoleCommandFactory hostRoleCommandFactory) {
- this.wrappersLoaded = true;
+ wrappersLoaded = true;
this.requestId = requestId;
this.logDir = logDir;
this.clusterName = clusterName;
@@ -109,7 +109,7 @@ public class Stage {
this.clusterHostInfo = clusterHostInfo;
this.commandParamsStage = commandParamsStage;
this.hostParamsStage = hostParamsStage;
- this.skippable = false;
+ skippable = false;
this.hostRoleCommandFactory = hostRoleCommandFactory;
}
@@ -248,10 +248,10 @@ public class Stage {
}
//used on stage creation only, no need to check if wrappers loaded
this.stageId = stageId;
- for (String host: this.commandsToSend.keySet()) {
- for (ExecutionCommandWrapper wrapper : this.commandsToSend.get(host)) {
+ for (String host: commandsToSend.keySet()) {
+ for (ExecutionCommandWrapper wrapper : commandsToSend.get(host)) {
ExecutionCommand cmd = wrapper.getExecutionCommand();
- cmd.setCommandId(StageUtils.getActionId(requestId, stageId));
+ cmd.setRequestAndStage(requestId, stageId);
}
}
}
@@ -275,34 +275,34 @@ public class Stage {
hrc.setExecutionCommandWrapper(wrapper);
cmd.setHostname(hostName);
cmd.setClusterName(clusterName);
- cmd.setCommandId(this.getActionId());
+ cmd.setRequestAndStage(requestId, stageId);
cmd.setRole(role.name());
cmd.setRoleCommand(command);
cmd.setServiceName("");
- Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(hostName);
+ Map<String, HostRoleCommand> hrcMap = hostRoleCommands.get(hostName);
if (hrcMap == null) {
hrcMap = new LinkedHashMap<String, HostRoleCommand>();
- this.hostRoleCommands.put(hostName, hrcMap);
+ hostRoleCommands.put(hostName, hrcMap);
}
if (hrcMap.get(role.toString()) != null) {
throw new RuntimeException(
"Setting the host role command second time for same stage: stage="
- + this.getActionId() + ", host=" + hostName + ", role=" + role);
+ + getActionId() + ", host=" + hostName + ", role=" + role);
}
hrcMap.put(role.toString(), hrc);
- List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(hostName);
+ List<ExecutionCommandWrapper> execCmdList = commandsToSend.get(hostName);
if (execCmdList == null) {
execCmdList = new ArrayList<ExecutionCommandWrapper>();
- this.commandsToSend.put(hostName, execCmdList);
+ commandsToSend.put(hostName, execCmdList);
}
if (execCmdList.contains(wrapper)) {
//todo: proper exception
throw new RuntimeException(
"Setting the execution command second time for same stage: stage="
- + this.getActionId() + ", host=" + hostName + ", role=" + role+ ", event="+event);
+ + getActionId() + ", host=" + hostName + ", role=" + role+ ", event="+event);
}
execCmdList.add(wrapper);
return wrapper;
@@ -458,7 +458,7 @@ public class Stage {
*/
public synchronized List<String> getHosts() { // TODO: Check whether method should be synchronized
List<String> hlist = new ArrayList<String>();
- for (String h : this.hostRoleCommands.keySet()) {
+ for (String h : hostRoleCommands.keySet()) {
hlist.add(h);
}
return hlist;
@@ -504,19 +504,19 @@ public class Stage {
}
public long getLastAttemptTime(String host, String role) {
- return this.hostRoleCommands.get(host).get(role).getLastAttemptTime();
+ return hostRoleCommands.get(host).get(role).getLastAttemptTime();
}
public short getAttemptCount(String host, String role) {
- return this.hostRoleCommands.get(host).get(role).getAttemptCount();
+ return hostRoleCommands.get(host).get(role).getAttemptCount();
}
public void incrementAttemptCount(String hostname, String role) {
- this.hostRoleCommands.get(hostname).get(role).incrementAttemptCount();
+ hostRoleCommands.get(hostname).get(role).incrementAttemptCount();
}
public void setLastAttemptTime(String host, String role, long t) {
- this.hostRoleCommands.get(host).get(role).setLastAttemptTime(t);
+ hostRoleCommands.get(host).get(role).setLastAttemptTime(t);
}
public ExecutionCommandWrapper getExecutionCommandWrapper(String hostname,
@@ -535,41 +535,41 @@ public class Stage {
}
public long getStartTime(String hostname, String role) {
- return this.hostRoleCommands.get(hostname).get(role).getStartTime();
+ return hostRoleCommands.get(hostname).get(role).getStartTime();
}
public void setStartTime(String hostname, String role, long startTime) {
- this.hostRoleCommands.get(hostname).get(role).setStartTime(startTime);
+ hostRoleCommands.get(hostname).get(role).setStartTime(startTime);
}
public HostRoleStatus getHostRoleStatus(String hostname, String role) {
- return this.hostRoleCommands.get(hostname).get(role).getStatus();
+ return hostRoleCommands.get(hostname).get(role).getStatus();
}
public void setHostRoleStatus(String host, String role,
HostRoleStatus status) {
- this.hostRoleCommands.get(host).get(role).setStatus(status);
+ hostRoleCommands.get(host).get(role).setStatus(status);
}
public ServiceComponentHostEventWrapper getFsmEvent(String hostname, String roleStr) {
- return this.hostRoleCommands.get(hostname).get(roleStr).getEvent();
+ return hostRoleCommands.get(hostname).get(roleStr).getEvent();
}
public void setExitCode(String hostname, String role, int exitCode) {
- this.hostRoleCommands.get(hostname).get(role).setExitCode(exitCode);
+ hostRoleCommands.get(hostname).get(role).setExitCode(exitCode);
}
public int getExitCode(String hostname, String role) {
- return this.hostRoleCommands.get(hostname).get(role).getExitCode();
+ return hostRoleCommands.get(hostname).get(role).getExitCode();
}
public void setStderr(String hostname, String role, String stdErr) {
- this.hostRoleCommands.get(hostname).get(role).setStderr(stdErr);
+ hostRoleCommands.get(hostname).get(role).setStderr(stdErr);
}
public void setStdout(String hostname, String role, String stdOut) {
- this.hostRoleCommands.get(hostname).get(role).setStdout(stdOut);
+ hostRoleCommands.get(hostname).get(role).setStdout(stdOut);
}
public synchronized boolean isStageInProgress() {
@@ -597,9 +597,10 @@ public class Stage {
if (hrc == null) {
return false;
}
- for (HostRoleStatus status : statuses)
- if (hrc.getStatus().equals(status)) {
- return true;
+ for (HostRoleStatus status : statuses) {
+ if (hrc.getStatus().equals(status)) {
+ return true;
+ }
}
}
}
@@ -608,11 +609,11 @@ public class Stage {
public Map<String, List<ExecutionCommandWrapper>> getExecutionCommands() {
checkWrappersLoaded();
- return this.commandsToSend;
+ return commandsToSend;
}
public String getLogDir() {
- return this.logDir;
+ return logDir;
}
public Map<String, Map<String, HostRoleCommand>> getHostRoleCommands() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
index 6c254e8..e4abb1d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
@@ -17,7 +17,12 @@
*/
package org.apache.ambari.server.agent;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.utils.StageUtils;
@@ -42,6 +47,12 @@ public class ExecutionCommand extends AgentCommand {
@SerializedName("clusterName")
private String clusterName;
+ @SerializedName("requestId")
+ private long requestId;
+
+ @SerializedName("stageId")
+ private long stageId;
+
@SerializedName("taskId")
private long taskId;
@@ -95,8 +106,20 @@ public class ExecutionCommand extends AgentCommand {
return commandId;
}
- public void setCommandId(String commandId) {
- this.commandId = commandId;
+ /**
+ * Sets the request and stage on this command. The {@code commandId} field is
+ * automatically constructed from these as requestId-stageId.
+ *
+ * @param requestId
+ * the ID of the execution request.
+ * @param stageId
+ * the ID of the stage request.
+ */
+ public void setRequestAndStage(long requestId, long stageId) {
+ this.requestId = requestId;
+ this.stageId = stageId;
+
+ commandId = StageUtils.getActionId(requestId, stageId);
}
@Override
@@ -267,7 +290,7 @@ public class ExecutionCommand extends AgentCommand {
* @param params for kerberos commands
*/
public void setKerberosCommandParams(List<Map<String, String>> params) {
- this.kerberosCommandParams = params;
+ kerberosCommandParams = params;
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java
index 69a03f5..268832e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java
@@ -56,8 +56,23 @@ import com.google.inject.Provider;
/**
* The {@link ConfigureAction} is used to alter a configuration property during
- * an upgrade. It will only produce a new configuration if the value being
- * changed is different than the existing value.
+ * an upgrade. It will only produce a new configuration if an actual change is
+ * occuring. For some configure tasks, the value is already at the desired
+ * property or the conditions of the task are not met. In these cases, a new
+ * configuration will not be created. This task can perform any of the following
+ * actions in a single declaration:
+ * <ul>
+ * <li>Copy a configuration to a new property key, optionally setting a default
+ * if the original property did not exist</li>
+ * <li>Copy a configuration to a new property key from one configuration type to
+ * another, optionally setting a default if the original property did not exist</li>
+ * <li>Rename a configuration, optionally setting a default if the original
+ * property did not exist</li>
+ * <li>Delete a configuration property</li>
+ * <li>Set a configuration property</li>
+ * <li>Conditionally set a configuration property based on another configuration
+ * property value</li>
+ * </ul>
*/
public class ConfigureAction extends AbstractServerAction {
@@ -274,7 +289,7 @@ public class ConfigureAction extends AbstractServerAction {
newValues.put(transfer.toKey, valueToCopy);
// append standard output
- outputBuffer.append(MessageFormat.format("Created {0}/{1} = {2}\n", configType,
+ outputBuffer.append(MessageFormat.format("Created {0}/{1} = \"{2}\"\n", configType,
transfer.toKey, mask(transfer, valueToCopy)));
}
break;
@@ -294,7 +309,8 @@ public class ConfigureAction extends AbstractServerAction {
changedValues = true;
// append standard output
- outputBuffer.append(MessageFormat.format("Created {0}/{1} with default value {2}\n",
+ outputBuffer.append(MessageFormat.format(
+ "Created {0}/{1} with default value \"{2}\"\n",
configType, transfer.toKey, mask(transfer, transfer.defaultValue)));
}
@@ -369,8 +385,16 @@ public class ConfigureAction extends AbstractServerAction {
// byproduct of the configure being able to take a list of transfers
// without a key/value to set
newValues.put(key, value);
- outputBuffer.append(MessageFormat.format("{0}/{1} changed to {2}\n", configType, key,
- mask(keyValuePair, value)));
+
+ final String message;
+ if (StringUtils.isEmpty(value)) {
+ message = MessageFormat.format("{0}/{1} changed to an empty value", configType, key);
+ } else {
+ message = MessageFormat.format("{0}/{1} changed to \"{2}\"\n", configType, key,
+ mask(keyValuePair, value));
+ }
+
+ outputBuffer.append(message);
}
}
}
@@ -381,7 +405,7 @@ public class ConfigureAction extends AbstractServerAction {
String toReplace = newValues.get(replacement.key);
if (!toReplace.contains(replacement.find)) {
- outputBuffer.append(MessageFormat.format("String {0} was not found in {1}/{2}\n",
+ outputBuffer.append(MessageFormat.format("String \"{0}\" was not found in {1}/{2}\n",
replacement.find, configType, replacement.key));
} else {
String replaced = StringUtils.replace(toReplace, replacement.find, replacement.replaceWith);
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java
index f5a77c5..95bfb48 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java
@@ -126,9 +126,6 @@ public class ConfigureTask extends ServerSideActionTask {
@XmlElement(name = "set")
private List<ConfigurationKeyValue> keyValuePairs;
- @XmlElement(name="summary")
- public String summary;
-
@XmlElement(name = "condition")
private List<Condition> conditions;
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java
index fe933cc..2b1ba56 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java
@@ -47,10 +47,6 @@ public class ManualTask extends ServerSideActionTask {
@XmlElement(name="message")
public String message;
- @XmlElement(name="summary")
- public String summary;
-
-
@Override
public Task.Type getType() {
return type;
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
index fbd837c..6416b57 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
@@ -17,6 +17,7 @@
*/
package org.apache.ambari.server.state.stack.upgrade;
+import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlSeeAlso;
@@ -27,6 +28,12 @@ import javax.xml.bind.annotation.XmlSeeAlso;
public abstract class Task {
/**
+ * An optional brief description of what this task is doing.
+ */
+ @XmlElement(name = "summary")
+ public String summary;
+
+ /**
* @return the type of the task
*/
public abstract Type getType();
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
index 66612bd..aeca69b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
@@ -17,10 +17,27 @@
*/
package org.apache.ambari.server.utils;
-import org.apache.commons.lang.StringUtils;
-import com.google.common.base.Joiner;
-import com.google.gson.Gson;
-import com.google.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import javax.xml.bind.JAXBException;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
@@ -36,6 +53,7 @@ import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
import org.apache.ambari.server.topology.TopologyManager;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.jackson.JsonGenerationException;
@@ -43,25 +61,9 @@ import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
-import javax.xml.bind.JAXBException;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import com.google.common.base.Joiner;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
public class StageUtils {
@@ -201,7 +203,8 @@ public class StageUtils {
new ServiceComponentHostInstallEvent("NAMENODE", hostname, now, "HDP-1.2.0"),
"cluster1", "HDFS", false);
ExecutionCommand execCmd = s.getExecutionCommandWrapper(hostname, "NAMENODE").getExecutionCommand();
- execCmd.setCommandId(s.getActionId());
+
+ execCmd.setRequestAndStage(s.getRequestId(), s.getStageId());
List<String> slaveHostList = new ArrayList<String>();
slaveHostList.add(hostname);
slaveHostList.add("host2");
@@ -245,7 +248,7 @@ public class StageUtils {
InputStream is = new ByteArrayInputStream(json.getBytes(Charset.forName("UTF8")));
return mapper.readValue(is, clazz);
}
-
+
public static Map<String, String> getCommandParamsStage(ActionExecutionContext actionExecContext) throws AmbariException {
return actionExecContext.getParameters() != null ? actionExecContext.getParameters() : new TreeMap<String, String>();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml
index 2c7bbc4..5129d87 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml
@@ -58,7 +58,8 @@ export JAVA_HOME={{java64_home}}
# Storm log folder
export STORM_LOG_DIR={{log_dir}}
-# export STORM_CONF_DIR=""
+export STORM_CONF_DIR={{conf_dir}}
+export STORM_HOME={{storm_component_home_dir}}
</value>
</property>
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py
index 66b46c8..93f3e05 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py
@@ -57,8 +57,10 @@ class NimbusDefault(Nimbus):
env.set_params(params)
if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
conf_select.select(params.stack_name, "storm", params.version)
+ hdp_select.select("storm-client", params.version)
hdp_select.select("storm-nimbus", params.version)
+
def start(self, env, rolling_restart=False):
import params
env.set_params(params)
@@ -66,16 +68,19 @@ class NimbusDefault(Nimbus):
setup_ranger_storm()
service("nimbus", action="start")
+
def stop(self, env, rolling_restart=False):
import params
env.set_params(params)
service("nimbus", action="stop")
+
def status(self, env):
import status_params
env.set_params(status_params)
check_process_status(status_params.pid_nimbus)
+
def security_status(self, env):
import status_params
env.set_params(status_params)
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py
index 313bb17..f9d64f4 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py
@@ -49,6 +49,7 @@ class Nimbus(Script):
if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
conf_select.select(params.stack_name, "storm", params.version)
+ hdp_select.select("storm-client", params.version)
hdp_select.select("storm-nimbus", params.version)
def start(self, env, rolling_restart=False):
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py
index 90e3b7b..41ea1ac 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py
@@ -55,6 +55,7 @@ sudo = AMBARI_SUDO_BINARY
stack_name = default("/hostLevelParams/stack_name", None)
version = default("/commandParams/version", None)
+storm_component_home_dir = status_params.storm_component_home_dir
conf_dir = status_params.conf_dir
stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
@@ -81,6 +82,8 @@ user_group = config['configurations']['cluster-env']['user_group']
java64_home = config['hostLevelParams']['java_home']
jps_binary = format("{java64_home}/bin/jps")
nimbus_port = config['configurations']['storm-site']['nimbus.thrift.port']
+storm_zookeeper_root_dir = default('/configurations/storm-site/storm.zookeeper.root', None)
+storm_zookeeper_servers = default('/configurations/storm-site/storm.zookeeper.servers', None)
# nimbus.seeds is supported in HDP 2.3.0.0 and higher
nimbus_seeds_supported = default('/configurations/storm-env/nimbus_seeds_supported', False)
@@ -93,8 +96,7 @@ rest_api_conf_file = format("{conf_dir}/config.yaml")
storm_env_sh_template = config['configurations']['storm-env']['content']
jmxremote_port = config['configurations']['storm-env']['jmxremote_port']
-if 'ganglia_server_host' in config['clusterHostInfo'] and \
- len(config['clusterHostInfo']['ganglia_server_host'])>0:
+if 'ganglia_server_host' in config['clusterHostInfo'] and len(config['clusterHostInfo']['ganglia_server_host'])>0:
ganglia_installed = True
ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
ganglia_report_interval = 60
@@ -135,6 +137,7 @@ if has_metric_collector:
metric_collector_host = ams_collector_hosts[0]
metric_collector_report_interval = 60
metric_collector_app_id = "nimbus"
+
metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink*.jar"
# ranger host
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py
index 901aecc..0080beb 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py
@@ -18,21 +18,19 @@ limitations under the License.
"""
-
from resource_management.core.resources import Execute
from resource_management.core.resources import File
from resource_management.libraries.functions.format import format
import time
-def service(
- name,
- action='start'):
+def service(name, action = 'start'):
import params
import status_params
pid_file = status_params.pid_files[name]
- no_op_test = format("ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1")
+ no_op_test = format(
+ "ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1")
if name == "logviewer" or name == "drpc":
tries_count = 12
@@ -45,43 +43,45 @@ def service(
process_grep = format("{rest_lib_dir}/storm-rest-.*\.jar$")
else:
process_grep = format("storm.daemon.{name}$")
-
+
find_proc = format("{jps_binary} -l | grep {process_grep}")
write_pid = format("{find_proc} | awk {{'print $1'}} > {pid_file}")
crt_pid_cmd = format("{find_proc} && {write_pid}")
- storm_env = format("source {conf_dir}/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH")
+ storm_env = format(
+ "source {conf_dir}/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH")
if action == "start":
if name == "rest_api":
- process_cmd = format("{storm_env} ; java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server")
- cmd = format("{process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log 2>&1")
+ process_cmd = format(
+ "{storm_env} ; java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server")
+ cmd = format(
+ "{process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log 2>&1")
else:
cmd = format("{storm_env} ; storm {name} > {log_dir}/{name}.out 2>&1")
Execute(cmd,
- not_if=no_op_test,
- user=params.storm_user,
- wait_for_finish=False,
- path=params.storm_bin_dir
- )
+ not_if = no_op_test,
+ user = params.storm_user,
+ wait_for_finish = False,
+ path = params.storm_bin_dir)
+
Execute(crt_pid_cmd,
- user=params.storm_user,
- logoutput=True,
- tries=tries_count,
- try_sleep=10,
- path=params.storm_bin_dir
- )
+ user = params.storm_user,
+ logoutput = True,
+ tries = tries_count,
+ try_sleep = 10,
+ path = params.storm_bin_dir)
elif action == "stop":
process_dont_exist = format("! ({no_op_test})")
pid = format("`cat {pid_file}`")
+
Execute(format("{sudo} kill {pid}"),
- not_if=process_dont_exist
- )
+ not_if = process_dont_exist)
+
Execute(format("{sudo} kill -9 {pid}"),
- not_if=format("sleep 2; {process_dont_exist} || sleep 20; {process_dont_exist}"),
- ignore_failures=True
- )
- File(pid_file,
- action = "delete",
- )
+ not_if = format(
+ "sleep 2; {process_dont_exist} || sleep 20; {process_dont_exist}"),
+ ignore_failures = True)
+
+ File(pid_file, action = "delete")
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py
index 99397ac..49dee47 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py
@@ -64,8 +64,10 @@ else:
kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
tmp_dir = Script.get_tmp_dir()
+ storm_component_home_dir = "/usr/lib/storm"
conf_dir = "/etc/storm/conf"
if Script.is_hdp_stack_greater_or_equal("2.2"):
+ storm_component_home_dir = format("/usr/hdp/current/{component_directory}")
conf_dir = format("/usr/hdp/current/{component_directory}/conf")
storm_user = config['configurations']['storm-env']['storm_user']
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py
new file mode 100644
index 0000000..b25cdf8
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py
@@ -0,0 +1,133 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import json
+import os
+
+from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+from resource_management.core.resources.system import Directory
+from resource_management.core.resources.system import File
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions import format
+from resource_management.libraries.functions.default import default
+
+class StormUpgrade(Script):
+ """
+ This class helps perform some of the upgrade tasks needed for Storm during
+ a non-rolling upgrade. Storm writes data to disk locally and to ZooKeeper.
+ If any HDP 2.2 bits exist in these directories when an HDP 2.3 instance
+ starts up, it will fail to start properly. Because the upgrade framework in
+ Ambari doesn't yet have a mechanism to say "stop all" before starting to
+ upgrade each component, we need to rely on a Storm trick to bring down
+ running daemons. By removing the ZooKeeper data with running daemons, those
+ daemons will die.
+ """
+
+ def delete_storm_zookeeper_data(self, env):
+ """
+ Deletes the Storm data from ZooKeeper, effectively bringing down all
+ Storm daemons.
+ :return:
+ """
+ import params
+
+ Logger.info('Clearing Storm data from ZooKeeper')
+
+ storm_zookeeper_root_dir = params.storm_zookeeper_root_dir
+ if storm_zookeeper_root_dir is None:
+ raise Fail("The storm ZooKeeper directory specified by storm-site/storm.zookeeper.root must be specified")
+
+ # create the ZooKeeper delete command
+ if params.version is not None:
+ command = "/usr/hdp/{0}/zookeeper/bin/zkCli.sh rmr /storm".format(params.version)
+ else:
+ command = "/usr/hdp/current/zookeeper-client/bin/zkCli.sh rmr /storm"
+
+ if params.security_enabled:
+ kinit_command=format("{kinit_path_local} -kt {smoke_user_keytab} {smokeuser_principal}; ")
+ Execute(kinit_command,user=params.smokeuser)
+
+ # clean out ZK
+ Execute(command, user=params.storm_user, tries=1)
+
+
+ def delete_storm_local_data(self, env):
+ """
+ Deletes Storm data from local directories. This will create a marker file
+ with JSON data representing the upgrade stack and request/stage ID. This
+ will prevent multiple Storm components on the same host from removing
+ the local directories more than once.
+ :return:
+ """
+ import params
+
+ Logger.info('Clearing Storm data from local directories...')
+
+ storm_local_directory = params.local_dir
+ if storm_local_directory is None:
+ raise Fail("The storm local directory specified by storm-site/storm.local.dir must be specified")
+
+ request_id = default("/requestId", None)
+ stage_id = default("/stageId", None)
+ stack_version = params.version
+ stack_name = params.stack_name
+
+ json_map = {}
+ json_map["requestId"] = request_id
+ json_map["stageId"] = stage_id
+ json_map["stackVersion"] = stack_version
+ json_map["stackName"] = stack_name
+
+ temp_directory = params.tmp_dir
+ upgrade_file = os.path.join(temp_directory, "storm-upgrade-{0}.json".format(stack_version))
+
+ if os.path.exists(upgrade_file):
+ try:
+ with open(upgrade_file) as file_pointer:
+ existing_json_map = json.load(file_pointer)
+
+ if cmp(json_map, existing_json_map) == 0:
+ Logger.info("The storm upgrade has already removed the local directories for {0}-{1} for request {2} and stage {3}".format(
+ stack_name, stack_version, request_id, stage_id))
+
+ # nothing else to do here for this as it appears to have already been
+ # removed by another component being upgraded
+ return
+
+ except:
+ Logger.error("The upgrade file {0} appears to be corrupt; removing...".format(upgrade_file))
+ File(upgrade_file, action="delete")
+ else:
+ # delete the upgrade file since it does not match
+ File(upgrade_file, action="delete")
+
+ # delete from local directory
+ Directory(storm_local_directory, action="delete", recursive=True)
+
+ # recreate storm local directory
+ Directory(storm_local_directory, mode=0755, owner = params.storm_user,
+ group = params.user_group, recursive = True)
+
+ # the file doesn't exist, so create it
+ with open(upgrade_file, 'w') as file_pointer:
+ json.dump(json_map, file_pointer, indent=2)
+
+if __name__ == "__main__":
+ StormUpgrade().execute()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py
index bdb03df..335aeeb 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py
@@ -75,6 +75,7 @@ class SupervisorDefault(Supervisor):
if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
conf_select.select(params.stack_name, "storm", params.version)
+ hdp_select.select("storm-client", params.version)
hdp_select.select("storm-supervisor", params.version)
def start(self, env, rolling_restart=False):
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py
index bde533f..f3074f1 100644
--- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py
+++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py
@@ -50,6 +50,7 @@ class Supervisor(Script):
if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
conf_select.select(params.stack_name, "storm", params.version)
+ hdp_select.select("storm-client", params.version)
hdp_select.select("storm-supervisor", params.version)
def start(self, env, rolling_restart=False):
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml
index 1aef735..aa52d3b 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml
@@ -33,8 +33,8 @@
# The java implementation to use.
export JAVA_HOME={{java64_home}}
-# export STORM_CONF_DIR=""
-export STORM_HOME=/usr/hdp/current/storm-client
+export STORM_CONF_DIR={{conf_dir}}
+export STORM_HOME={{storm_component_home_dir}}
</value>
</property>
</configuration>
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml
index 05aa89f..1cbdd88 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml
@@ -178,6 +178,7 @@
<service name="ZOOKEEPER">
<component>ZOOKEEPER_CLIENT</component>
</service>
+
<service name="HDFS">
<component>HDFS_CLIENT</component>
</service>
@@ -285,6 +286,7 @@
<function>finalize_rolling_upgrade</function>
</task>
</execute-stage>
+
<execute-stage title="Save Cluster State" service="" component="">
<task xsi:type="server_action" class="org.apache.ambari.server.serveraction.upgrades.FinalizeUpgradeAction">
</task>
@@ -331,6 +333,7 @@
<value>org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer</value>
</condition>
</task>
+
<task xsi:type="configure" summary="Transitioning Ranger HDFS Policy">
<type>ranger-hdfs-policymgr-ssl</type>
<transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="SSL_KEYSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.keystore" default-value="/usr/hdp/current/hadoop-client/conf/ranger-plugin-keystore.jks" />
@@ -338,6 +341,7 @@
<transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="SSL_TRUSTSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.truststore" default-value="/usr/hdp/current/hadoop-client/conf/ranger-plugin-truststore.jks" />
<transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="SSL_TRUSTSTORE_PASSWORD" to-key="xasecure.policymgr.clientssl.truststore.password" mask="true" default-value="changeit" />
</task>
+
<task xsi:type="configure" summary="Transitioning Ranger HDFS Audit">
<type>ranger-hdfs-audit</type>
<transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="XAAUDIT.DB.IS_ENABLED" to-key="xasecure.audit.destination.db" default-value="false"/>
@@ -387,6 +391,7 @@
<transfer operation="delete" delete-key="POLICY_MGR_URL" />
</task>
</pre-upgrade>
+
<upgrade>
<task xsi:type="restart" />
</upgrade>
@@ -556,6 +561,7 @@
<transfer operation="delete" delete-key="XAAUDIT.DB.PASSWORD" />
</task>
</pre-upgrade>
+
<upgrade>
<task xsi:type="restart" />
</upgrade>
@@ -580,9 +586,6 @@
<task xsi:type="configure">
<type>tez-site</type>
<set key="tez.am.view-acls" value="*"/>
- </task>
- <task xsi:type="configure">
- <type>tez-site</type>
<set key="tez.task.generate.counters.per.io" value="true"/>
</task>
</pre-upgrade>
@@ -899,17 +902,33 @@
<message>Before continuing, please deactivate and kill any currently running topologies.</message>
</task>
+ <task xsi:type="execute" summary="Removing Storm data from ZooKeeper">
+ <script>scripts/storm_upgrade.py</script>
+ <function>delete_storm_zookeeper_data</function>
+ </task>
+
+ <task xsi:type="execute" summary="Removing local Storm data">
+ <script>scripts/storm_upgrade.py</script>
+ <function>delete_storm_local_data</function>
+ </task>
+
<task xsi:type="configure" summary="Converting nimbus.host into nimbus.seeds">
<type>storm-site</type>
- <transfer operation="copy" from-key="nimbus.host" to-key="nimbus.seeds" coerce-to="yaml-array" />
- <transfer operation="delete" delete-key="nimbus.host" />
+ <transfer operation="copy" from-key="nimbus.host" to-key="nimbus.seeds" coerce-to="yaml-array"/>
+ <transfer operation="delete" delete-key="nimbus.host"/>
+ </task>
+
+ <task xsi:type="configure" summary="Updating Storm home and configuration environment variables">
+ <type>storm-env</type>
+ <replace key="content" find="# export STORM_CONF_DIR=""" replace-with="export STORM_CONF_DIR={{conf_dir}}"/>
+ <replace key="content" find="export STORM_HOME=/usr/hdp/current/storm-client" replace-with="export STORM_HOME={{storm_component_home_dir}}"/>
</task>
<task xsi:type="configure" summary="Configuring Ranger Storm Policy">
<type>ranger-storm-policymgr-ssl</type>
- <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_KEYSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.keystore" default-value="/usr/hdp/current/storm-nimbus/conf/ranger-plugin-keystore.jks"/>
+ <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_KEYSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.keystore" default-value="/usr/hdp/current/storm-client/conf/ranger-plugin-keystore.jks"/>
<transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_KEYSTORE_PASSWORD" to-key="xasecure.policymgr.clientssl.keystore.password" default-value="myKeyFilePassword" mask="true"/>
- <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_TRUSTSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.truststore" default-value="/usr/hdp/current/storm-nimbus/conf/ranger-plugin-truststore.jks"/>
+ <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_TRUSTSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.truststore" default-value="/usr/hdp/current/storm-client/conf/ranger-plugin-truststore.jks"/>
<transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_TRUSTSTORE_PASSWORD" to-key="xasecure.policymgr.clientssl.truststore.password" default-value="changeit" mask="true"/>
</task>
@@ -959,25 +978,56 @@
<task xsi:type="restart" />
</upgrade>
</component>
+
<component name="STORM_REST_API">
+ <pre-upgrade>
+ <task xsi:type="execute" summary="Removing local Storm data">
+ <script>scripts/storm_upgrade.py</script>
+ <function>delete_storm_local_data</function>
+ </task>
+ </pre-upgrade>
<upgrade>
<task xsi:type="restart" />
</upgrade>
</component>
+
<component name="SUPERVISOR">
+ <pre-upgrade>
+ <task xsi:type="execute" summary="Removing local Storm data">
+ <script>scripts/storm_upgrade.py</script>
+ <function>delete_storm_local_data</function>
+ </task>
+ </pre-upgrade>
<upgrade>
<task xsi:type="restart" />
</upgrade>
</component>
+
<component name="STORM_UI_SERVER">
+ <pre-upgrade>
+ <task xsi:type="execute" summary="Removing local Storm data">
+ <script>scripts/storm_upgrade.py</script>
+ <function>delete_storm_local_data</function>
+ </task>
+ </pre-upgrade>
+
<upgrade>
<task xsi:type="restart" />
</upgrade>
</component>
+
<component name="DRPC_SERVER">
+ <pre-upgrade>
+ <task xsi:type="execute" summary="Removing local Storm data">
+ <script>scripts/storm_upgrade.py</script>
+ <function>delete_storm_local_data</function>
+ </task>
+ </pre-upgrade>
+
<upgrade>
<task xsi:type="restart" />
</upgrade>
+
<post-upgrade>
<task xsi:type="manual">
<message>Please rebuild your topology using the new Storm version dependencies and resubmit it using the newly created jar.</message>
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml
index 4600a14..855e6fd 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml
@@ -22,7 +22,7 @@
<property>
<name>xasecure.policymgr.clientssl.keystore</name>
- <value>/usr/hdp/current/storm-nimbus/conf/ranger-plugin-keystore.jks</value>
+ <value>/usr/hdp/current/storm-client/conf/ranger-plugin-keystore.jks</value>
<description>Java Keystore files</description>
</property>
@@ -34,7 +34,7 @@
<property>
<name>xasecure.policymgr.clientssl.truststore</name>
- <value>/usr/hdp/current/storm-nimbus/conf/ranger-plugin-truststore.jks</value>
+ <value>/usr/hdp/current/storm-client/conf/ranger-plugin-truststore.jks</value>
<description>java truststore file</description>
</property>
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml
index 33567c8..f26be4d 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml
@@ -40,7 +40,7 @@
<property>
<name>ranger.plugin.storm.policy.rest.ssl.config.file</name>
- <value>/usr/hdp/current/storm-nimbus/conf/ranger-policymgr-ssl.xml</value>
+ <value>/usr/hdp/current/storm-client/conf/ranger-policymgr-ssl.xml</value>
<description>Path to the file containing SSL details to contact Ranger Admin</description>
</property>
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
index 66efea1..8d21b80 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
@@ -52,7 +52,7 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
public class ExecutionCommandWrapperTest {
-
+
private static final String HOST1 = "dev01.ambari.apache.org";
private static final String CLUSTER1 = "c1";
private static final String CLUSTER_VERSION_TAG = "clusterVersion";
@@ -87,7 +87,7 @@ public class ExecutionCommandWrapperTest {
private static Map<String, String> SERVICE_SITE_SERVICE;
private static Map<String, String> SERVICE_SITE_HOST;
private static Map<String, Map<String, String>> CONFIG_ATTRIBUTES;
-
+
private static Injector injector;
private static Clusters clusters;
private static ConfigFactory configFactory;
@@ -101,35 +101,35 @@ public class ExecutionCommandWrapperTest {
configHelper = injector.getInstance(ConfigHelper.class);
configFactory = injector.getInstance(ConfigFactory.class);
stageFactory = injector.getInstance(StageFactory.class);
-
+
clusters = injector.getInstance(Clusters.class);
clusters.addHost(HOST1);
clusters.getHost(HOST1).persist();
clusters.addCluster(CLUSTER1, new StackId("HDP-0.1"));
-
+
Cluster cluster1 = clusters.getCluster(CLUSTER1);
-
+
SERVICE_SITE_CLUSTER = new HashMap<String, String>();
SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME1, SERVICE_SITE_VAL1);
SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2);
SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME3, SERVICE_SITE_VAL3);
SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME4, SERVICE_SITE_VAL4);
-
+
SERVICE_SITE_SERVICE = new HashMap<String, String>();
SERVICE_SITE_SERVICE.put(SERVICE_SITE_NAME1, SERVICE_SITE_VAL1_S);
SERVICE_SITE_SERVICE.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2_S);
SERVICE_SITE_SERVICE.put(SERVICE_SITE_NAME5, SERVICE_SITE_VAL5_S);
-
+
SERVICE_SITE_HOST = new HashMap<String, String>();
SERVICE_SITE_HOST.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2_H);
SERVICE_SITE_HOST.put(SERVICE_SITE_NAME6, SERVICE_SITE_VAL6_H);
-
+
GLOBAL_CLUSTER = new HashMap<String, String>();
GLOBAL_CLUSTER.put(GLOBAL_NAME1, GLOBAL_CLUSTER_VAL1);
GLOBAL_CLUSTER.put(GLOBAL_NAME2, GLOBAL_CLUSTER_VAL2);
-
+
CONFIG_ATTRIBUTES = new HashMap<String, Map<String,String>>();
-
+
//Cluster level global config
Config globalConfig = configFactory.createNew(cluster1, GLOBAL_CONFIG, GLOBAL_CLUSTER, CONFIG_ATTRIBUTES);
globalConfig.setTag(CLUSTER_VERSION_TAG);
@@ -139,25 +139,25 @@ public class ExecutionCommandWrapperTest {
Config serviceSiteConfigCluster = configFactory.createNew(cluster1, SERVICE_SITE_CONFIG, SERVICE_SITE_CLUSTER, CONFIG_ATTRIBUTES);
serviceSiteConfigCluster.setTag(CLUSTER_VERSION_TAG);
cluster1.addConfig(serviceSiteConfigCluster);
-
+
//Service level service config
Config serviceSiteConfigService = configFactory.createNew(cluster1, SERVICE_SITE_CONFIG, SERVICE_SITE_SERVICE, CONFIG_ATTRIBUTES);
serviceSiteConfigService.setTag(SERVICE_VERSION_TAG);
cluster1.addConfig(serviceSiteConfigService);
-
+
//Host level service config
Config serviceSiteConfigHost = configFactory.createNew(cluster1, SERVICE_SITE_CONFIG, SERVICE_SITE_HOST, CONFIG_ATTRIBUTES);
serviceSiteConfigHost.setTag(HOST_VERSION_TAG);
cluster1.addConfig(serviceSiteConfigHost);
-
+
ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
-
+
createTask(db, 1, 1, HOST1, CLUSTER1);
-
+
}
-
+
private static void createTask(ActionDBAccessor db, long requestId, long stageId, String hostName, String clusterName) throws AmbariException {
-
+
Stage s = stageFactory.createNew(requestId, "/var/log", clusterName, 1L, "execution command wrapper test", "clusterHostInfo", "commandParamsStage", "hostParamsStage");
s.setStageId(stageId);
s.addHostRoleExecutionCommand(hostName, Role.NAMENODE,
@@ -169,36 +169,36 @@ public class ExecutionCommandWrapperTest {
Request request = new Request(stages, clusters);
db.persistActions(request);
}
-
+
@Test
public void testGetExecutionCommand() throws JSONException, AmbariException {
-
-
+
+
Map<String, Map<String, String>> confs = new HashMap<String, Map<String, String>>();
Map<String, String> configurationsGlobal = new HashMap<String, String>();
configurationsGlobal.put(GLOBAL_NAME1, GLOBAL_VAL1);
confs.put(GLOBAL_CONFIG, configurationsGlobal);
-
+
Map<String, Map<String, String>> confTags = new HashMap<String, Map<String, String>>();
Map<String, String> confTagServiceSite = new HashMap<String, String>();
-
+
confTagServiceSite.put("tag", CLUSTER_VERSION_TAG);
confTagServiceSite.put("service_override_tag", SERVICE_VERSION_TAG);
confTagServiceSite.put("host_override_tag", HOST_VERSION_TAG);
-
+
confTags.put(SERVICE_SITE_CONFIG, confTagServiceSite);
-
+
Map<String, String> confTagGlobal = Collections.singletonMap("tag", CLUSTER_VERSION_TAG);
-
+
confTags.put(GLOBAL_CONFIG, confTagGlobal);
-
-
+
+
ExecutionCommand executionCommand = new ExecutionCommand();
-
-
+
+
executionCommand.setClusterName(CLUSTER1);
executionCommand.setTaskId(1);
- executionCommand.setCommandId("1-1");
+ executionCommand.setRequestAndStage(1, 1);
executionCommand.setHostname(HOST1);
executionCommand.setRole("NAMENODE");
executionCommand.setRoleParams(Collections.<String, String>emptyMap());
@@ -208,63 +208,63 @@ public class ExecutionCommandWrapperTest {
executionCommand.setServiceName("HDFS");
executionCommand.setCommandType(AgentCommandType.EXECUTION_COMMAND);
executionCommand.setCommandParams(Collections.<String, String>emptyMap());
-
+
String json = StageUtils.getGson().toJson(executionCommand, ExecutionCommand.class);
ExecutionCommandWrapper execCommWrap = new ExecutionCommandWrapper(json);
ExecutionCommand processedExecutionCommand = execCommWrap.getExecutionCommand();
-
+
Map<String, String> serviceSiteConfig = processedExecutionCommand.getConfigurations().get(SERVICE_SITE_CONFIG);
-
+
Assert.assertEquals(SERVICE_SITE_VAL1_S, serviceSiteConfig.get(SERVICE_SITE_NAME1));
Assert.assertEquals(SERVICE_SITE_VAL2_H, serviceSiteConfig.get(SERVICE_SITE_NAME2));
Assert.assertEquals(SERVICE_SITE_VAL3, serviceSiteConfig.get(SERVICE_SITE_NAME3));
Assert.assertEquals(SERVICE_SITE_VAL4, serviceSiteConfig.get(SERVICE_SITE_NAME4));
Assert.assertEquals(SERVICE_SITE_VAL5_S, serviceSiteConfig.get(SERVICE_SITE_NAME5));
Assert.assertEquals(SERVICE_SITE_VAL6_H, serviceSiteConfig.get(SERVICE_SITE_NAME6));
-
+
Map<String, String> globalConfig = processedExecutionCommand.getConfigurations().get(GLOBAL_CONFIG);
-
+
Assert.assertEquals(GLOBAL_VAL1, globalConfig.get(GLOBAL_NAME1));
Assert.assertEquals(GLOBAL_CLUSTER_VAL2, globalConfig.get(GLOBAL_NAME2));
-
+
//Union of all keys of service site configs
Set<String> serviceSiteKeys = new HashSet<String>();
serviceSiteKeys.addAll(SERVICE_SITE_CLUSTER.keySet());
serviceSiteKeys.addAll(SERVICE_SITE_SERVICE.keySet());
serviceSiteKeys.addAll(SERVICE_SITE_HOST.keySet());
-
+
Assert.assertEquals(serviceSiteKeys.size(), serviceSiteConfig.size());
-
+
}
-
+
@Test
public void testGetMergedConfig() {
Map<String, String> baseConfig = new HashMap<String, String>();
-
+
baseConfig.put(SERVICE_SITE_NAME1, SERVICE_SITE_VAL1);
baseConfig.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2);
baseConfig.put(SERVICE_SITE_NAME3, SERVICE_SITE_VAL3);
baseConfig.put(SERVICE_SITE_NAME4, SERVICE_SITE_VAL4);
baseConfig.put(SERVICE_SITE_NAME5, SERVICE_SITE_VAL5);
-
+
Map<String, String> overrideConfig = new HashMap<String, String>();
-
+
overrideConfig.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2_H);
overrideConfig.put(SERVICE_SITE_NAME6, SERVICE_SITE_VAL6_H);
-
-
+
+
Map<String, String> mergedConfig = configHelper.getMergedConfig(baseConfig,
overrideConfig);
-
-
+
+
Set<String> configsKeys = new HashSet<String>();
configsKeys.addAll(baseConfig.keySet());
configsKeys.addAll(overrideConfig.keySet());
-
+
Assert.assertEquals(configsKeys.size(), mergedConfig.size());
-
+
Assert.assertEquals(SERVICE_SITE_VAL1, mergedConfig.get(SERVICE_SITE_NAME1));
Assert.assertEquals(SERVICE_SITE_VAL2_H, mergedConfig.get(SERVICE_SITE_NAME2));
Assert.assertEquals(SERVICE_SITE_VAL3, mergedConfig.get(SERVICE_SITE_NAME3));
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index ca1a5a0..15d7904 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -84,8 +84,17 @@ import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
import org.apache.ambari.server.orm.OrmTestHelper;
-import org.apache.ambari.server.orm.dao.*;
-import org.apache.ambari.server.orm.entities.*;
+import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.dao.ResourceTypeDAO;
+import org.apache.ambari.server.orm.dao.StackDAO;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.orm.entities.ResourceEntity;
+import org.apache.ambari.server.orm.entities.ResourceTypeEntity;
+import org.apache.ambari.server.orm.entities.StackEntity;
import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileWriter;
import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileWriterFactory;
import org.apache.ambari.server.serveraction.kerberos.KerberosServerAction;
@@ -229,7 +238,7 @@ public class TestHeartbeatHandler {
hostObject.setState(HostState.UNHEALTHY);
ExecutionCommand execCmd = new ExecutionCommand();
- execCmd.setCommandId("2-34");
+ execCmd.setRequestAndStage(2, 34);
execCmd.setHostname(DummyHostname1);
aq.enqueue(DummyHostname1, new ExecutionCommand());
HeartBeat hb = new HeartBeat();
@@ -2318,11 +2327,11 @@ public class TestHeartbeatHandler {
clusterEntity.setDesiredStack(stackEntity);
clusterDAO.create(clusterEntity);
-
+
StackId stackId = new StackId(DummyStackId);
-
+
Cluster cluster = clusters.getCluster(DummyCluster);
-
+
cluster.setDesiredStackVersion(stackId);
cluster.setCurrentStackVersion(stackId);
helper.getOrCreateRepositoryVersion(stackId, stackId.getStackVersion());
@@ -2520,7 +2529,7 @@ public class TestHeartbeatHandler {
hostObject.setState(HostState.UNHEALTHY);
ExecutionCommand execCmd = new ExecutionCommand();
- execCmd.setCommandId("2-34");
+ execCmd.setRequestAndStage(2, 34);
execCmd.setHostname(DummyHostname1);
aq.enqueue(DummyHostname1, new ExecutionCommand());
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
index f79fa6d..4548b8d 100644
--- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
+++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
@@ -149,6 +149,7 @@ class TestStormNimbus(TestStormBase):
hdp_stack_version = self.STACK_VERSION,
target = RMFTestCase.TARGET_COMMON_SERVICES)
+ self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067")
self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.2.1.0-2067")
def test_pre_rolling_restart_23(self):
@@ -168,6 +169,7 @@ class TestStormNimbus(TestStormBase):
call_mocks = [(0, None), (0, None)],
mocks_dict = mocks_dict)
+ self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234")
self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.3.0.0-1234")
self.assertEquals(2, mocks_dict['call'].call_count)
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py
index 17c0c6b..85a4a2c 100644
--- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py
+++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py
@@ -114,6 +114,7 @@ class TestStormNimbus(TestStormBase):
hdp_stack_version = self.STACK_VERSION,
target = RMFTestCase.TARGET_COMMON_SERVICES)
+ self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067")
self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.2.1.0-2067")
def test_pre_rolling_restart_23(self):
@@ -133,6 +134,7 @@ class TestStormNimbus(TestStormBase):
call_mocks = [(0, None), (0, None)],
mocks_dict = mocks_dict)
+ self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234")
self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.3.0.0-1234")
self.assertEquals(2, mocks_dict['call'].call_count)
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py
index 38b7b64..c4a261e 100644
--- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py
+++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py
@@ -194,6 +194,7 @@ class TestStormSupervisor(TestStormBase):
hdp_stack_version = self.STACK_VERSION,
target = RMFTestCase.TARGET_COMMON_SERVICES)
+ self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067")
self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.2.1.0-2067")
def test_pre_rolling_restart_23(self):
@@ -213,6 +214,7 @@ class TestStormSupervisor(TestStormBase):
call_mocks = [(0, None), (0, None)],
mocks_dict = mocks_dict)
+ self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234")
self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.3.0.0-1234")
self.assertEquals(2, mocks_dict['call'].call_count)
http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py
index a6dc423..926e57e 100644
--- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py
+++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py
@@ -163,6 +163,7 @@ class TestStormSupervisor(TestStormBase):
hdp_stack_version = self.STACK_VERSION,
target = RMFTestCase.TARGET_COMMON_SERVICES)
+ self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067")
self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.2.1.0-2067")
def test_pre_rolling_restart_23(self):
@@ -182,6 +183,7 @@ class TestStormSupervisor(TestStormBase):
call_mocks = [(0, None), (0, None)],
mocks_dict = mocks_dict)
+ self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234")
self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.3.0.0-1234")
self.assertEquals(2, mocks_dict['call'].call_count)