You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2013/11/14 07:30:52 UTC
[1/2] AMBARI 3731. Custom Action: Add support for custom action
execution
Updated Branches:
refs/heads/trunk 708e59d9b -> 22f5fdfb7
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
index d6523b0..a9c73bd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
@@ -188,7 +188,7 @@ public class StageUtils {
public static Map<String, List<String>> getClusterHostInfo(
Map<String, Host> allHosts, Cluster cluster, HostsMap hostsMap,
- Injector injector) throws AmbariException {
+ Configuration configuration) throws AmbariException {
Map<String, List<String>> info = new HashMap<String, List<String>>();
if (cluster.getServices() != null) {
String hostName = getHostName();
@@ -213,8 +213,7 @@ public class StageUtils {
info.put(clusterInfoKey, hostList);
}
//Set up ambari-rca connection properties, is this a hack?
-// info.put("ambari_db_server_host", Arrays.asList(hostsMap.getHostMap(getHostName())));
- Configuration configuration = injector.getInstance(Configuration.class);
+ //info.put("ambari_db_server_host", Arrays.asList(hostsMap.getHostMap(getHostName())));
String url = configuration.getRcaDatabaseUrl();
if (url.contains(Configuration.HOSTNAME_MACRO)) {
url = url.replace(Configuration.HOSTNAME_MACRO, hostsMap.getHostMap(hostName));
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql b/ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql
index c016d96..96d5956 100644
--- a/ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql
+++ b/ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql
@@ -132,6 +132,10 @@ ALTER TABLE ambari.confgroupclusterconfigmapping ADD CONSTRAINT FK_confgroupclus
ALTER TABLE ambari.configgrouphostmapping ADD CONSTRAINT FK_configgrouphostmapping_configgroup_id FOREIGN KEY (config_group_id) REFERENCES ambari.configgroup (group_id);
ALTER TABLE ambari.configgrouphostmapping ADD CONSTRAINT FK_configgrouphostmapping_host_name FOREIGN KEY (host_name) REFERENCES ambari.hosts (host_name);
+-- required for custom action
+CREATE TABLE ambari.action (action_name VARCHAR(255) NOT NULL, action_type VARCHAR(32) NOT NULL, inputs VARCHAR(1000),
+target_service VARCHAR(255), target_component VARCHAR(255), default_timeout SMALLINT NOT NULL, description VARCHAR(1000), target_type VARCHAR(32), PRIMARY KEY (action_name));
+GRANT ALL PRIVILEGES ON TABLE ambari.action TO :username;
--Move cluster host info for old execution commands to stage table
UPDATE ambari.stage sd
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index b0bd728..8c7d839 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -28,6 +28,7 @@ import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -57,9 +58,11 @@ public class TestActionDBAccessorImpl {
private long stageId = 31;
private String hostName = "host1";
private String clusterName = "cluster1";
+ private String actionName = "validate_kerberos";
private Injector injector;
ActionDBAccessor db;
ActionManager am;
+ CustomActionDBAccessor cdb;
@Inject
private Clusters clusters;
@@ -77,9 +80,10 @@ public class TestActionDBAccessorImpl {
clusters.getHost(hostName).persist();
clusters.addCluster(clusterName);
db = injector.getInstance(ActionDBAccessorImpl.class);
-
+ cdb = injector.getInstance(CustomActionDBAccessor.class);
+
am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
- new HostsMap((String) null), null, injector.getInstance(UnitOfWork.class), null);
+ new HostsMap((String) null), null, injector.getInstance(UnitOfWork.class), cdb);
}
@After
@@ -150,26 +154,26 @@ public class TestActionDBAccessorImpl {
@Test
public void testHostRoleScheduled() throws InterruptedException {
populateActionDB(db, hostName, requestId, stageId);
- Stage stage = db.getAction(StageUtils.getActionId(requestId, stageId));
+ Stage stage = db.getStage(StageUtils.getActionId(requestId, stageId));
assertEquals(HostRoleStatus.PENDING, stage.getHostRoleStatus(hostName, Role.HBASE_MASTER.toString()));
List<HostRoleCommandEntity> entities=
- hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+ hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER.toString());
assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
stage.setHostRoleStatus(hostName, Role.HBASE_MASTER.toString(), HostRoleStatus.QUEUED);
- entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+ entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER.toString());
assertEquals(HostRoleStatus.QUEUED, stage.getHostRoleStatus(hostName, Role.HBASE_MASTER.toString()));
assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
db.hostRoleScheduled(stage, hostName, Role.HBASE_MASTER.toString());
- entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+ entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER.toString());
assertEquals(HostRoleStatus.QUEUED, entities.get(0).getStatus());
Thread thread = new Thread(){
@Override
public void run() {
- Stage stage1 = db.getAction("23-31");
+ Stage stage1 = db.getStage("23-31");
stage1.setHostRoleStatus(hostName, Role.HBASE_MASTER.toString(), HostRoleStatus.COMPLETED);
db.hostRoleScheduled(stage1, hostName, Role.HBASE_MASTER.toString());
}
@@ -178,9 +182,47 @@ public class TestActionDBAccessorImpl {
thread.start();
thread.join();
- entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+ entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER.toString());
assertEquals("Concurrent update failed", HostRoleStatus.COMPLETED, entities.get(0).getStatus());
+ }
+
+ @Test
+ public void testCustomActionScheduled() throws InterruptedException {
+ populateActionDBWithCustomAction(db, hostName, requestId, stageId);
+ Stage stage = db.getStage(StageUtils.getActionId(requestId, stageId));
+ assertEquals(HostRoleStatus.PENDING, stage.getHostRoleStatus(hostName, actionName));
+ List<HostRoleCommandEntity> entities =
+ hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, actionName);
+
+ assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
+ stage.setHostRoleStatus(hostName, actionName, HostRoleStatus.QUEUED);
+
+ entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, actionName);
+ assertEquals(HostRoleStatus.QUEUED, stage.getHostRoleStatus(hostName, actionName));
+ assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
+
+ long now = System.currentTimeMillis();
+ db.hostRoleScheduled(stage, hostName, actionName);
+
+ entities = hostRoleCommandDAO.findByHostRole(
+ hostName, requestId, stageId, actionName);
+ assertEquals(HostRoleStatus.QUEUED, entities.get(0).getStatus());
+
+
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ Stage stage1 = db.getStage("23-31");
+ stage1.setHostRoleStatus(hostName, actionName, HostRoleStatus.COMPLETED);
+ db.hostRoleScheduled(stage1, hostName, actionName);
+ }
+ };
+
+ thread.start();
+ thread.join();
+ entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, actionName);
+ assertEquals("Concurrent update failed", HostRoleStatus.COMPLETED, entities.get(0).getStatus());
}
@Test
@@ -199,7 +241,8 @@ public class TestActionDBAccessorImpl {
commandReport.setExitCode(123);
db.updateHostRoleState(hostName, requestId, stageId, Role.HBASE_MASTER.toString(), commandReport);
- List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+ List<HostRoleCommandEntity> commandEntities =
+ hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER.toString());
assertEquals(1, commandEntities.size());
HostRoleCommandEntity commandEntity = commandEntities.get(0);
HostRoleCommand command = db.getTask(commandEntity.getTaskId());
@@ -297,4 +340,19 @@ public class TestActionDBAccessorImpl {
stages.add(s);
db.persistActions(stages);
}
+
+ private void populateActionDBWithCustomAction(ActionDBAccessor db, String hostname,
+ long requestId, long stageId) {
+ Stage s = new Stage(requestId, "/a/b", "cluster1", "action db accessor test", "");
+ s.setStageId(stageId);
+ s.addHostRoleExecutionCommand(hostname, Role.valueOf(actionName),
+ RoleCommand.ACTIONEXECUTE,
+ new ServiceComponentHostStartEvent(Role.HBASE_MASTER.toString(),
+ hostname, System.currentTimeMillis()), "cluster1", "HBASE");
+ List<Stage> stages = new ArrayList<Stage>();
+ stages.add(s);
+ ExecuteActionRequest request = new ExecuteActionRequest("cluster1", null, actionName, "HBASE",
+ "HBASE_MASTER", null, null);
+ db.persistActions(stages);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
index d465b59..f5ffe8b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
@@ -175,7 +175,7 @@ public class TestActionManager {
populateActionDB(db, hostname);
assertEquals(1, clusters.getClusters().size());
- Cluster cluster = clusters.getCluster(clusterName);
+ clusters.getCluster(clusterName);
assertEquals(1, am.getRequests().size());
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
index 3e8448c..2af7eef 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
@@ -37,10 +37,12 @@ import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.ServiceNotFoundException;
import org.apache.ambari.server.StackAccessException;
import org.apache.ambari.server.actionmanager.ActionDBAccessor;
+import org.apache.ambari.server.actionmanager.ActionType;
import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.TargetHostType;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
@@ -2052,13 +2054,9 @@ public class AmbariManagementControllerTest {
@Test
public void testGetServiceComponentHosts() throws AmbariException {
- clusters.addCluster("c1");
- Cluster c1 = clusters.getCluster("c1");
- c1.setDesiredStackVersion(new StackId("HDP-0.1"));
- clusters.addHost("h1");
- clusters.getHost("h1").setOsType("centos5");
- clusters.getHost("h1").persist();
- clusters.mapHostToCluster("h1", "c1");
+ Cluster c1 = setupClusterWithHosts("c1", "HDP-0.1", new ArrayList<String>() {{
+ add("h1");
+ }}, "centos5");
Service s1 = serviceFactory.createNew(c1, "HDFS");
c1.addService(s1);
s1.persist();
@@ -2105,24 +2103,29 @@ public class AmbariManagementControllerTest {
}
+ private Cluster setupClusterWithHosts(String clusterName, String stackId, List<String> hosts,
+ String osType) throws AmbariException {
+ clusters.addCluster(clusterName);
+ Cluster c1 = clusters.getCluster(clusterName);
+ c1.setDesiredStackVersion(new StackId(stackId));
+ for (String host : hosts) {
+ clusters.addHost(host);
+ clusters.getHost(host).setOsType(osType);
+ clusters.getHost(host).persist();
+ clusters.mapHostToCluster(host, clusterName);
+ }
+ return c1;
+ }
+
@Test
public void testGetServiceComponentHostsWithFilters() throws AmbariException {
- clusters.addCluster("c1");
- Cluster c1 = clusters.getCluster("c1");
- c1.setDesiredStackVersion(new StackId("HDP-0.2"));
-
- clusters.addHost("h1");
- clusters.addHost("h2");
- clusters.addHost("h3");
- clusters.getHost("h1").setOsType("centos5");
- clusters.getHost("h2").setOsType("centos5");
- clusters.getHost("h3").setOsType("centos5");
- clusters.getHost("h1").persist();
- clusters.getHost("h2").persist();
- clusters.getHost("h3").persist();
- clusters.mapHostToCluster("h1", "c1");
- clusters.mapHostToCluster("h2", "c1");
- clusters.mapHostToCluster("h3", "c1");
+ Cluster c1 = setupClusterWithHosts("c1", "HDP-0.2",
+ new ArrayList<String>() {{
+ add("h1");
+ add("h2");
+ add("h3");
+ }},
+ "centos5");
Service s1 = serviceFactory.createNew(c1, "HDFS");
Service s2 = serviceFactory.createNew(c1, "MAPREDUCE");
@@ -2279,25 +2282,21 @@ public class AmbariManagementControllerTest {
@Test
public void testGetHosts() throws AmbariException {
- clusters.addCluster("c1");
- clusters.addCluster("c2");
- clusters.getCluster("c1").setDesiredStackVersion(new StackId("HDP-0.2"));
- clusters.getCluster("c2").setDesiredStackVersion(new StackId("HDP-0.2"));
- clusters.addHost("h1");
- clusters.addHost("h2");
- clusters.addHost("h3");
+ setupClusterWithHosts("c1", "HDP-0.2",
+ new ArrayList<String>() {{
+ add("h1");
+ add("h2");
+ }},
+ "centos5");
+
+ setupClusterWithHosts("c2", "HDP-0.2",
+ new ArrayList<String>() {{
+ add("h3");
+ }},
+ "centos5");
clusters.addHost("h4");
- clusters.getHost("h1").setOsType("centos5");
- clusters.getHost("h2").setOsType("centos5");
- clusters.getHost("h3").setOsType("centos5");
clusters.getHost("h4").setOsType("centos5");
- clusters.getHost("h1").persist();
- clusters.getHost("h2").persist();
- clusters.getHost("h3").persist();
clusters.getHost("h4").persist();
- clusters.mapHostToCluster("h1", "c1");
- clusters.mapHostToCluster("h2", "c1");
- clusters.mapHostToCluster("h3", "c2");
Map<String, String> attrs = new HashMap<String, String>();
attrs.put("a1", "b1");
@@ -3105,6 +3104,7 @@ public class AmbariManagementControllerTest {
Assert.assertNull(trackAction);
}
+ @Ignore
@Test
public void testServiceComponentHostUpdateStackId() throws AmbariException {
String clusterName = "foo1";
@@ -3264,6 +3264,7 @@ public class AmbariManagementControllerTest {
}
}
+ @Ignore
@Test
public void testServiceComponentHostUpdateStackIdError() throws AmbariException {
String clusterName = "foo1";
@@ -3442,19 +3443,124 @@ public class AmbariManagementControllerTest {
// start should fail
}
- @SuppressWarnings("serial")
@Test
- public void testCreateActionsFailures() throws Exception {
- clusters.addCluster("c1");
- clusters.getCluster("c1").setDesiredStackVersion(new StackId("HDP-0.1"));
- clusters.addHost("h1");
- clusters.getHost("h1").setOsType("centos5");
- clusters.getHost("h1").persist();
- Set<String> hostNames = new HashSet<String>(){{
- add("h1");
+ public void testCreateCustomActions() throws Exception {
+ setupClusterWithHosts("c1", "HDP-2.0.6",
+ new ArrayList<String>() {{
+ add("h1");
+ add("h2");
+ add("h3");
+ }},
+ "centos6");
+
+ Cluster cluster = clusters.getCluster("c1");
+ cluster.setDesiredStackVersion(new StackId("HDP-2.0.6"));
+ cluster.setCurrentStackVersion(new StackId("HDP-2.0.6"));
+
+ ConfigFactory cf = injector.getInstance(ConfigFactory.class);
+ Config config1 = cf.createNew(cluster, "global",
+ new HashMap<String, String>() {{
+ put("key1", "value1");
+ }});
+ config1.setVersionTag("version1");
+
+ Config config2 = cf.createNew(cluster, "core-site",
+ new HashMap<String, String>() {{
+ put("key1", "value1");
+ }});
+ config2.setVersionTag("version1");
+
+ cluster.addConfig(config1);
+ cluster.addConfig(config2);
+
+ Service hdfs = cluster.addService("HDFS");
+ hdfs.persist();
+
+ Service mapred = cluster.addService("YARN");
+ mapred.persist();
+
+ hdfs.addServiceComponent(Role.HDFS_CLIENT.name()).persist();
+ hdfs.addServiceComponent(Role.NAMENODE.name()).persist();
+ hdfs.addServiceComponent(Role.DATANODE.name()).persist();
+
+ mapred.addServiceComponent(Role.RESOURCEMANAGER.name()).persist();
+
+ hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).addServiceComponentHost("h1").persist();
+ hdfs.getServiceComponent(Role.NAMENODE.name()).addServiceComponentHost("h1").persist();
+ hdfs.getServiceComponent(Role.DATANODE.name()).addServiceComponentHost("h1").persist();
+ hdfs.getServiceComponent(Role.DATANODE.name()).addServiceComponentHost("h2").persist();
+
+ controller.getActionManager().createActionDefinition(
+ "a1", ActionType.SYSTEM, "test", "Does file exist", "", "",
+ TargetHostType.SPECIFIC, Short.valueOf("100"));
+
+ controller.getActionManager().createActionDefinition(
+ "a2", ActionType.SYSTEM, "", "Does file exist", "HDFS", "DATANODE",
+ TargetHostType.ALL, Short.valueOf("100"));
+
+ Map<String, String> params = new HashMap<String, String>() {{
+ put("test", "test");
}};
- clusters.mapHostsToCluster(hostNames, "c1");
+ Map<String, String> requestProperties = new HashMap<String, String>();
+ requestProperties.put(REQUEST_CONTEXT_PROPERTY, "Called from a test");
+ long now = System.currentTimeMillis();
+
+ ArrayList<String> hosts = new ArrayList<String>() {{add("h1");}};
+
+ ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", null, "a1", "HDFS", "DATANODE", hosts, params);
+ RequestStatusResponse response = controller.createAction(actionRequest, requestProperties);
+ assertEquals(1, response.getTasks().size());
+ ShortTaskStatus taskStatus = response.getTasks().get(0);
+ Assert.assertEquals("h1", taskStatus.getHostName());
+
+ List<HostRoleCommand> storedTasks = actionDB.getRequestTasks(response.getRequestId());
+ Stage stage = actionDB.getAllStages(response.getRequestId()).get(0);
+ Assert.assertNotNull(stage);
+ Assert.assertEquals(1, storedTasks.size());
+ HostRoleCommand task = storedTasks.get(0);
+ Assert.assertEquals(RoleCommand.ACTIONEXECUTE, task.getRoleCommand());
+ Assert.assertEquals("a1", task.getRole().name());
+ Assert.assertEquals("h1", task.getHostName());
+ ExecutionCommand cmd = task.getExecutionCommandWrapper().getExecutionCommand();
+ Assert.assertTrue(cmd.getCommandParams().containsKey("test"));
+ Assert.assertEquals(cmd.getServiceName(), "HDFS");
+ Assert.assertEquals(cmd.getComponentName(), "DATANODE");
+
+ actionRequest = new ExecuteActionRequest("c1", null, "a2", "", "", null, params);
+ response = controller.createAction(actionRequest, requestProperties);
+ assertEquals(2, response.getTasks().size());
+
+ final List<HostRoleCommand> storedTasks2 = actionDB.getRequestTasks(response.getRequestId());
+ task = storedTasks2.get(1);
+ Assert.assertEquals(RoleCommand.ACTIONEXECUTE, task.getRoleCommand());
+ Assert.assertEquals("a2", task.getRole().name());
+ HashSet<String> expectedHosts = new HashSet<String>(){{add("h2"); add("h1");}};
+ HashSet<String> actualHosts = new HashSet<String>(){{add(storedTasks2.get(1).getHostName()); add(storedTasks2
+ .get(0).getHostName());}};
+ Assert.assertEquals(expectedHosts, actualHosts);
+
+ cmd = task.getExecutionCommandWrapper().getExecutionCommand();
+ Assert.assertTrue(cmd.getCommandParams().containsKey("test"));
+ Assert.assertEquals(cmd.getServiceName(), "HDFS");
+ Assert.assertEquals(cmd.getComponentName(), "DATANODE");
+
+ hosts = new ArrayList<String>() {{add("h3");}};
+ actionRequest = new ExecuteActionRequest("c1", null, "a1", "", "", hosts, params);
+ response = controller.createAction(actionRequest, requestProperties);
+ assertEquals(1, response.getTasks().size());
+ taskStatus = response.getTasks().get(0);
+ Assert.assertEquals("h3", taskStatus.getHostName());
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void testCreateActionsFailures() throws Exception {
+ setupClusterWithHosts("c1", "HDP-0.1",
+ new ArrayList<String>() {{
+ add("h1");
+ }},
+ "centos5");
Cluster cluster = clusters.getCluster("c1");
cluster.setDesiredStackVersion(new StackId("HDP-0.1"));
@@ -3462,11 +3568,15 @@ public class AmbariManagementControllerTest {
ConfigFactory cf = injector.getInstance(ConfigFactory.class);
Config config1 = cf.createNew(cluster, "global",
- new HashMap<String, String>(){{ put("key1", "value1"); }});
+ new HashMap<String, String>() {{
+ put("key1", "value1");
+ }});
config1.setVersionTag("version1");
Config config2 = cf.createNew(cluster, "core-site",
- new HashMap<String, String>(){{ put("key1", "value1"); }});
+ new HashMap<String, String>() {{
+ put("key1", "value1");
+ }});
config2.setVersionTag("version1");
cluster.addConfig(config1);
@@ -3477,10 +3587,15 @@ public class AmbariManagementControllerTest {
Service hdfs = cluster.addService("HDFS");
hdfs.persist();
+ Service mapred = cluster.addService("MAPREDUCE");
+ mapred.persist();
+
hdfs.addServiceComponent(Role.HDFS_CLIENT.name()).persist();
hdfs.addServiceComponent(Role.NAMENODE.name()).persist();
hdfs.addServiceComponent(Role.DATANODE.name()).persist();
+ mapred.addServiceComponent(Role.MAPREDUCE_CLIENT.name()).persist();
+
hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).addServiceComponentHost("h1").persist();
hdfs.getServiceComponent(Role.NAMENODE.name()).addServiceComponentHost("h1").persist();
hdfs.getServiceComponent(Role.DATANODE.name()).addServiceComponentHost("h1").persist();
@@ -3493,74 +3608,114 @@ public class AmbariManagementControllerTest {
Map<String, String> requestProperties = new HashMap<String, String>();
requestProperties.put(REQUEST_CONTEXT_PROPERTY, "Called from a test");
- try {
- controller.createAction(actionRequest, requestProperties);
- Assert.fail("createAction should fail for NON_EXISTENT_CHECK");
- } catch (AmbariException ex) {
- Assert.assertTrue(ex.getMessage().contains("Unsupported action"));
- }
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Unsupported action");
actionRequest = new ExecuteActionRequest("c1", "NON_EXISTENT_SERVICE_CHECK", "HDFS", params);
- try {
- controller.createAction(actionRequest, requestProperties);
- Assert.fail("createAction should fail for NON_EXISTENT_SERVICE_CHECK");
- } catch (AmbariException ex) {
- Assert.assertTrue(ex.getMessage().contains("Unsupported action"));
- }
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Unsupported action");
actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION_DATANODE", "MAPREDUCE", params);
- try {
- controller.createAction(actionRequest, requestProperties);
- Assert.fail("createAction should fail for DECOMMISSION_DATANODE on MAPREDUCE");
- } catch (AmbariException ex) {
- Assert.assertTrue(ex.getMessage().contains("Unsupported action DECOMMISSION_DATANODE for MAPREDUCE"));
- }
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Unsupported action DECOMMISSION_DATANODE for MAPREDUCE");
actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION_DATANODE", "HDFS", params);
- try {
- controller.createAction(actionRequest, requestProperties);
- Assert.fail("createAction should fail for DECOMMISSION_DATANODE on HDFS - no excludeFileTag");
- } catch (IllegalArgumentException ex) {
- Assert.assertTrue(ex.getMessage().contains("No exclude file specified when decommissioning datanodes"));
- }
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "No exclude file specified when decommissioning datanodes");
params.put("excludeFileTag", "tag1");
actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION_DATANODE", "HDFS", params);
- try {
- controller.createAction(actionRequest, requestProperties);
- Assert.fail("createAction should fail for DECOMMISSION_DATANODE on HDFS - no config type hdfs-exclude-file");
- } catch (AmbariException ex) {
- Assert.assertTrue(ex.getMessage().contains("Decommissioning datanodes requires the cluster"));
- }
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Decommissioning datanodes requires the cluster");
actionRequest = new ExecuteActionRequest("c1", null, "DECOMMISSION_DATANODE", "HDFS", null, null, params);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Action DECOMMISSION_DATANODE does not exist");
+
+ controller.getActionManager().createActionDefinition(
+ "a1", ActionType.SYSTEM, "test,dirName", "Does file exist", "", "",
+ TargetHostType.SPECIFIC, Short.valueOf("100"));
+
+ controller.getActionManager().createActionDefinition(
+ "a2", ActionType.SYSTEM, "", "Does file exist", "HDFS", "DATANODE",
+ TargetHostType.ANY, Short.valueOf("100"));
+
+ controller.getActionManager().createActionDefinition(
+ "a3", ActionType.SYSTEM, "", "Does file exist", "YARN", "NODEMANAGER",
+ TargetHostType.ANY, Short.valueOf("100"));
+
+ controller.getActionManager().createActionDefinition(
+ "a4", ActionType.SYSTEM, "", "Does file exist", "MAPREDUCE", "",
+ TargetHostType.ANY, Short.valueOf("100"));
+
+ actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, null, null);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Action a1 requires input 'test' that is not provided");
+
+ actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, null, params);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Action a1 requires input 'dirName' that is not provided");
+
+ params.put("dirName", "dirName");
+ actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, null, params);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Action a1 requires explicit target host(s)");
+
+ actionRequest = new ExecuteActionRequest("c1", null, "a2", "MAPREDUCE", null, null, params);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Action a2 targets service MAPREDUCE that does not match with expected HDFS");
+
+ actionRequest = new ExecuteActionRequest("c1", null, "a2", "HDFS", "HDFS_CLIENT", null, params);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Action a2 targets component HDFS_CLIENT that does not match with expected DATANODE");
+
+ actionRequest = new ExecuteActionRequest("c1", null, "a1", "HDFS2", "HDFS_CLIENT", null, params);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Action a1 targets service HDFS2 that does not exist");
+
+ actionRequest = new ExecuteActionRequest("c1", null, "a1", "HDFS", "HDFS_CLIENT2", null, params);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Action a1 targets component HDFS_CLIENT2 that does not exist");
+
+ actionRequest = new ExecuteActionRequest("c1", null, "a1", "", "HDFS_CLIENT2", null, params);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Action a1 targets component HDFS_CLIENT2 without specifying the target service");
+
+ actionRequest = new ExecuteActionRequest("c1", null, "a3", "", "", null, params);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Action a3 targets service YARN that does not exist");
+
+ List<String> hosts = new ArrayList<String>();
+ hosts.add("h6");
+ actionRequest = new ExecuteActionRequest("c1", null, "a2", "", "", hosts, params);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Request specifies host h6 but its not a valid host based on the target service=HDFS and component=DATANODE");
+
+ actionRequest = new ExecuteActionRequest("c1", null, "a4", "MAPREDUCE", "", null, params);
+ expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+ "Suitable hosts not found, component=, service=MAPREDUCE, cluster=c1, actionName=a4");
+
+ }
+
+ private void expectActionCreationErrorWithMessage(ExecuteActionRequest actionRequest,
+ Map<String, String> requestProperties, String message) {
try {
RequestStatusResponse response = controller.createAction(actionRequest, requestProperties);
- if (response != null) {
- Assert.fail("createAction should fail for action DECOMMISSION_DATANODE");
- }
+ Assert.fail("createAction should fail");
} catch (AmbariException ex) {
- Assert.assertTrue(ex.getMessage().contains("Invalid action request"));
+ LOG.info(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().contains(message));
}
}
@SuppressWarnings("serial")
@Test
- public void testCreateActions() throws Exception {
- clusters.addCluster("c1");
- clusters.getCluster("c1").setDesiredStackVersion(new StackId("HDP-0.1"));
- clusters.addHost("h1");
- clusters.getHost("h1").setOsType("centos5");
- clusters.getHost("h1").persist();
- clusters.addHost("h2");
- clusters.getHost("h2").setOsType("centos5");
- clusters.getHost("h2").persist();
- Set<String> hostNames = new HashSet<String>(){{
- add("h1");
- add("h2");
- }};
-
- clusters.mapHostsToCluster(hostNames, "c1");
+ public void testCreateServiceCheckActions() throws Exception {
+ setupClusterWithHosts("c1", "HDP-0.1",
+ new ArrayList<String>() {{
+ add("h1");
+ add("h2");
+ }},
+ "centos5");
Cluster cluster = clusters.getCluster("c1");
cluster.setDesiredStackVersion(new StackId("HDP-0.1"));
@@ -8166,6 +8321,16 @@ public class AmbariManagementControllerTest {
}
createRequest =
+ new ActionRequest("a1", "SYSTEM", "", "", "", "SS", "ANY", "10");
+ try {
+ ActionResourceProviderTest.createAction(controller, createRequest);
+ Assert.fail("Exception must be thrown");
+ } catch (Exception ex) {
+ LOG.info(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().contains("Default timeout should be between 60 and 600"));
+ }
+
+ createRequest =
new ActionRequest("a1", "SYSTEM", "", "HDFS", "", "SS", "ANY", "100");
try {
ActionResourceProviderTest.createAction(controller, createRequest);
@@ -8185,6 +8350,17 @@ public class AmbariManagementControllerTest {
LOG.info(ex.getMessage());
Assert.assertTrue(ex.getMessage().contains("No enum const class"));
}
+
+ createRequest =
+ new ActionRequest("a1", "SYSTEM", "", "", "DATANODE", "SS", "SPECIFIC", "100");
+ try {
+ ActionResourceProviderTest.createAction(controller, createRequest);
+ Assert.fail("Exception must be thrown");
+ } catch (Exception ex) {
+ LOG.info(ex.getMessage());
+ Assert.assertTrue(ex.getMessage().contains("Target component cannot be specified unless target service is " +
+ "specified"));
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java
index 586478d..96daeea 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java
@@ -187,7 +187,9 @@ public class TestOrmImpl extends Assert {
public void testAbortHostRoleCommands() {
injector.getInstance(OrmTestHelper.class).createStageCommands();
HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
- int result = hostRoleCommandDAO.updateStatusByRequestId(0L, HostRoleStatus.ABORTED, Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS, HostRoleStatus.PENDING));
+ int result = hostRoleCommandDAO.updateStatusByRequestId(
+ 0L, HostRoleStatus.ABORTED, Arrays.asList(HostRoleStatus.QUEUED,
+ HostRoleStatus.IN_PROGRESS, HostRoleStatus.PENDING));
//result always 1 in batch mode
List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByRequest(0L);
int count = 0;
@@ -203,7 +205,7 @@ public class TestOrmImpl extends Assert {
public void testFindStageByHostRole() {
injector.getInstance(OrmTestHelper.class).createStageCommands();
HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
- List<HostRoleCommandEntity> list = hostRoleCommandDAO.findByHostRole("test_host1", 0L, 0L, Role.DATANODE);
+ List<HostRoleCommandEntity> list = hostRoleCommandDAO.findByHostRole("test_host1", 0L, 0L, Role.DATANODE.toString());
assertEquals(1, list.size());
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java
index 516069a..fbb382f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java
@@ -211,7 +211,8 @@ public class TestStageUtils {
addHbaseService(fsm.getCluster("c1"), hostList, injector);
addMapreduceService(fsm.getCluster("c1"), hostList, injector);
Map<String, List<String>> info = StageUtils.getClusterHostInfo(fsm.getHostsForCluster("c1"),
- fsm.getCluster("c1"), new HostsMap(injector.getInstance(Configuration.class)), injector);
+ fsm.getCluster("c1"), new HostsMap(injector.getInstance(Configuration.class)),
+ injector.getInstance(Configuration.class));
assertEquals(2, info.get("slave_hosts").size());
assertEquals(2, info.get("mapred_tt_hosts").size());
assertEquals(2, info.get("hbase_rs_hosts").size());
[2/2] git commit: AMBARI 3731. Custom Action: Add support for custom
action execution
Posted by sm...@apache.org.
AMBARI 3731. Custom Action: Add support for custom action execution
Project: http://git-wip-us.apache.org/repos/asf/incubator-ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ambari/commit/22f5fdfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ambari/tree/22f5fdfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ambari/diff/22f5fdfb
Branch: refs/heads/trunk
Commit: 22f5fdfb70916fb5ffe486c6bcb50f36bc0de1b4
Parents: 708e59d
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Wed Nov 13 22:30:26 2013 -0800
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Wed Nov 13 22:30:26 2013 -0800
----------------------------------------------------------------------
.../org/apache/ambari/server/RoleCommand.java | 3 +-
.../server/actionmanager/ActionDBAccessor.java | 73 +++-
.../actionmanager/ActionDBAccessorImpl.java | 61 ++-
.../actionmanager/ActionDBInMemoryImpl.java | 49 +--
.../server/actionmanager/ActionManager.java | 9 +-
.../server/actionmanager/ActionScheduler.java | 6 +-
.../CustomActionDBAccessorImpl.java | 12 +-
.../server/actionmanager/HostRoleCommand.java | 9 -
.../ambari/server/agent/ExecutionCommand.java | 13 +-
.../ambari/server/agent/HeartBeatHandler.java | 13 +-
.../controller/ActionExecutionContext.java | 91 +++++
.../controller/AmbariActionExecutionHelper.java | 290 ++++++++++++++
.../AmbariCustomCommandExecutionHelper.java | 239 ++++++++++++
.../AmbariManagementControllerImpl.java | 278 +++-----------
.../server/controller/ExecuteActionRequest.java | 38 +-
.../server/orm/dao/HostRoleCommandDAO.java | 4 +-
.../apache/ambari/server/orm/dao/StageDAO.java | 4 -
.../server/orm/entities/ActionEntity.java | 14 +-
.../svccomphost/ServiceComponentHostImpl.java | 2 -
.../apache/ambari/server/utils/StageUtils.java | 5 +-
.../ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql | 4 +
.../actionmanager/TestActionDBAccessorImpl.java | 76 +++-
.../server/actionmanager/TestActionManager.java | 2 +-
.../AmbariManagementControllerTest.java | 376 ++++++++++++++-----
.../apache/ambari/server/orm/TestOrmImpl.java | 6 +-
.../ambari/server/utils/TestStageUtils.java | 3 +-
26 files changed, 1214 insertions(+), 466 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
index 33370bf..ad006ec 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
@@ -24,5 +24,6 @@ public enum RoleCommand {
STOP,
EXECUTE,
ABORT,
- UPGRADE
+ UPGRADE,
+ ACTIONEXECUTE
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 2c79edf..11605bb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -17,23 +17,35 @@
*/
package org.apache.ambari.server.actionmanager;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
+
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.ambari.server.Role;
-import org.apache.ambari.server.agent.CommandReport;
-
public interface ActionDBAccessor {
- public Stage getAction(String actionId);
+ /**
+ * Given an action id of the form requestId-stageId, retrieve the Stage
+ */
+ public Stage getStage(String actionId);
+ /**
+ * Get all stages associated with a single request id
+ */
public List<Stage> getAllStages(long requestId);
+ /**
+ * Abort all outstanding operations associated with the given request
+ */
public void abortOperation(long requestId);
- public void timeoutHostRole(String host, long requestId, long stageId, Role role);
+ /**
+ * Mark the task as to have timed out
+ */
+ public void timeoutHostRole(String host, long requestId, long stageId, String role);
/**
* Returns all the pending stages, including queued and not-queued.
@@ -41,47 +53,86 @@ public interface ActionDBAccessor {
*/
public List<Stage> getStagesInProgress();
+ /**
+ * Persists all tasks for a given request
+ *
+ * @param stages Stages belonging to the request
+ */
public void persistActions(List<Stage> stages);
+ /**
+ * For the given host, update all the tasks based on the command report
+ */
public void updateHostRoleState(String hostname, long requestId,
- long stageId, String role, CommandReport report);
+ long stageId, String role, CommandReport report);
- public void abortHostRole(String host, long requestId, long stageId,
- Role role);
+ /**
+ * Mark the task as to have been aborted
+ */
+ public void abortHostRole(String host, long requestId, long stageId, String role);
/**
* Return the last persisted Request ID as seen when the DBAccessor object
* was initialized.
* Value should remain unchanged through the lifetime of the object instance.
+ *
* @return Request Id seen at init time
*/
public long getLastPersistedRequestIdWhenInitialized();
/**
* Updates scheduled stage.
- * @param s
- * @param hostname
- * @param roleStr
*/
public void hostRoleScheduled(Stage s, String hostname, String roleStr);
+ /**
+ * Given a request id, get all the tasks that belong to this request
+ */
public List<HostRoleCommand> getRequestTasks(long requestId);
+ /**
+ * Given a list of request ids, get all the tasks that belong to these requests
+ */
public List<HostRoleCommand> getAllTasksByRequestIds(Collection<Long> requestIds);
+ /**
+ * Get a list of host role commands where the request id belongs to the input requestIds and
+ * the task id belongs to the input taskIds
+ */
public List<HostRoleCommand> getTasksByRequestAndTaskIds(Collection<Long> requestIds, Collection<Long> taskIds);
+ /**
+ * Given a list of task ids, get all the host role commands
+ */
public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds);
+ /**
+ * Get all stages that contain tasks with specified host role statuses
+ */
public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses);
+ /**
+ * Get all requests
+ */
public List<Long> getRequests();
+ /**
+ * Gets the host role command corresponding to the task id
+ */
public HostRoleCommand getTask(long taskId);
+ /**
+ * Gets request id of request that are in the specified status
+ */
public List<Long> getRequestsByStatus(RequestStatus status);
+ /**
+ * Gets request contexts associated with the list of request id
+ */
public Map<Long, String> getRequestContext(List<Long> requestIds);
+ /**
+ * Gets the request context associated with the request id
+ */
public String getRequestContext(long requestId);
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 75ebfef..d0cba5e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -17,16 +17,17 @@
*/
package org.apache.ambari.server.actionmanager;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
import com.google.inject.name.Named;
+import com.google.inject.persist.Transactional;
import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.Role;
import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
+import org.apache.ambari.server.orm.dao.ActionDefinitionDAO;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
import org.apache.ambari.server.orm.dao.HostDAO;
@@ -44,15 +45,20 @@ import org.apache.ambari.server.state.Clusters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-import com.google.inject.persist.Transactional;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
@Singleton
public class ActionDBAccessorImpl implements ActionDBAccessor {
private static final Logger LOG = LoggerFactory.getLogger(ActionDBAccessorImpl.class);
-
+ private final long requestId;
@Inject
private ClusterDAO clusterDAO;
@Inject
@@ -71,12 +77,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
private HostRoleCommandFactory hostRoleCommandFactory;
@Inject
private Clusters clusters;
-
private Cache<Long, HostRoleCommand> hostRoleCommandCache;
private long cacheLimit; //may be exceeded to store tasks from one request
- private final long requestId;
-
@Inject
public ActionDBAccessorImpl(Injector injector, @Named("executionCommandCacheSize") long cacheLimit) {
injector.injectMembers(this);
@@ -90,10 +93,10 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
}
/* (non-Javadoc)
- * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getAction(java.lang.String)
+ * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getStage(java.lang.String)
*/
@Override
- public Stage getAction(String actionId) {
+ public Stage getStage(String actionId) {
return stageFactory.createExisting(actionId);
}
@@ -137,7 +140,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
@Override
@Transactional
public void timeoutHostRole(String host, long requestId, long stageId,
- Role role) {
+ String role) {
List<HostRoleCommandEntity> commands =
hostRoleCommandDAO.findByHostRole(host, requestId, stageId, role);
for (HostRoleCommandEntity command : commands) {
@@ -170,6 +173,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding stages to DB, stageCount=" + stages.size());
}
+
for (Stage stage : stages) {
StageEntity stageEntity = stage.constructNewPersistenceEntity();
Cluster cluster;
@@ -213,7 +217,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
roleSuccessCriteriaDAO.create(roleSuccessCriteriaEntity);
}
-
}
}
@@ -227,7 +230,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
+ stageId + " role " + role + " report " + report);
}
List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(
- hostname, requestId, stageId, Role.valueOf(role));
+ hostname, requestId, stageId, role);
for (HostRoleCommandEntity command : commands) {
command.setStatus(HostRoleStatus.valueOf(report.getStatus()));
command.setStdOut(report.getStdOut().getBytes());
@@ -238,13 +241,13 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
}
@Override
- public void abortHostRole(String host, long requestId, long stageId, Role role) {
+ public void abortHostRole(String host, long requestId, long stageId, String role) {
CommandReport report = new CommandReport();
report.setExitCode(999);
report.setStdErr("Host Role in invalid state");
report.setStdOut("");
report.setStatus("ABORTED");
- updateHostRoleState(host, requestId, stageId, role.toString(), report);
+ updateHostRoleState(host, requestId, stageId, role, report);
}
@Override
@@ -266,7 +269,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
} else {
throw new RuntimeException("HostRoleCommand is not persisted, cannot update:\n" + hostRoleCommand);
}
-
}
@Override
@@ -275,11 +277,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
return getTasks(
hostRoleCommandDAO.findTaskIdsByRequest(requestId)
);
-
-// for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequest(requestId)) {
-// tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-// }
-// return tasks;
}
@Override
@@ -291,12 +288,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
return getTasks(
hostRoleCommandDAO.findTaskIdsByRequestIds(requestIds)
);
-
-// List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
-// for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestIds(requestIds)) {
-// tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-// }
-// return tasks;
}
@Override
@@ -304,11 +295,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
if (!requestIds.isEmpty() && !taskIds.isEmpty()) {
return getTasks(hostRoleCommandDAO.findTaskIdsByRequestAndTaskIds(requestIds, taskIds));
-// List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
-// for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestAndTaskIds(requestIds, taskIds)) {
-// tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-// }
-// return tasks;
} else if (requestIds.isEmpty()) {
return getTasks(taskIds);
} else if (taskIds.isEmpty()) {
@@ -335,7 +321,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
if (!absent.isEmpty()) {
boolean allowStore = hostRoleCommandCache.size() <= cacheLimit;
-// LOG.info("Cache size {}, enable = {}", hostRoleCommandCache.size(), allowStore);
for (HostRoleCommandEntity commandEntity : hostRoleCommandDAO.findByPKs(absent)) {
HostRoleCommand hostRoleCommand = hostRoleCommandFactory.createExisting(commandEntity);
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
index 1fccf12..8c36366 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
@@ -17,38 +17,42 @@
*/
package org.apache.ambari.server.actionmanager;
-import java.util.*;
-
-import org.apache.ambari.server.Role;
+import com.google.inject.Singleton;
import org.apache.ambari.server.agent.CommandReport;
-import org.apache.ambari.server.agent.ExecutionCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.inject.Singleton;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
@Singleton
public class ActionDBInMemoryImpl implements ActionDBAccessor {
+ private static Logger LOG = LoggerFactory.getLogger(ActionDBInMemoryImpl.class);
// for a persisted DB, this will be initialized in the ctor
// with the highest persisted requestId value in the DB
private final long lastRequestId = 0;
- private static Logger LOG = LoggerFactory.getLogger(ActionDBInMemoryImpl.class);
List<Stage> stageList = new ArrayList<Stage>();
@Override
- public synchronized Stage getAction(String actionId) {
- for (Stage s: stageList) {
+ public synchronized Stage getStage(String actionId) {
+ for (Stage s : stageList) {
if (s.getActionId().equals(actionId)) {
return s;
}
}
return null;
}
+
@Override
public synchronized List<Stage> getAllStages(long requestId) {
List<Stage> l = new ArrayList<Stage>();
- for (Stage s: stageList) {
+ for (Stage s : stageList) {
if (s.getRequestId() == requestId) {
l.add(s);
}
@@ -80,7 +84,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
@Override
public synchronized void timeoutHostRole(String host, long requestId,
- long stageId, Role role) {
+ long stageId, String role) {
for (Stage s : stageList) {
s.setHostRoleStatus(host, role.toString(), HostRoleStatus.TIMEDOUT);
}
@@ -89,7 +93,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
@Override
public synchronized List<Stage> getStagesInProgress() {
List<Stage> l = new ArrayList<Stage>();
- for (Stage s: stageList) {
+ for (Stage s : stageList) {
if (s.isStageInProgress()) {
l.add(s);
}
@@ -99,15 +103,16 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
@Override
public synchronized void persistActions(List<Stage> stages) {
- for (Stage s: stages) {
+ for (Stage s : stages) {
stageList.add(s);
}
}
+
@Override
public synchronized void updateHostRoleState(String hostname, long requestId,
- long stageId, String role, CommandReport report) {
- LOG.info("DEBUG stages to iterate: "+stageList.size());
- if(null == report.getStatus()
+ long stageId, String role, CommandReport report) {
+ LOG.info("DEBUG stages to iterate: " + stageList.size());
+ if (null == report.getStatus()
|| null == report.getStdOut()
|| null == report.getStdErr()) {
throw new RuntimeException("Badly formed command report.");
@@ -124,13 +129,13 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
}
@Override
- public void abortHostRole(String host, long requestId, long stageId, Role role) {
+ public void abortHostRole(String host, long requestId, long stageId, String role) {
CommandReport report = new CommandReport();
report.setExitCode(999);
report.setStdErr("Host Role in invalid state");
report.setStdOut("");
report.setStatus("ABORTED");
- updateHostRoleState(host, requestId, stageId, role.toString(), report);
+ updateHostRoleState(host, requestId, stageId, role, report);
}
@Override
@@ -168,17 +173,18 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
@Override
public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) {
List<Stage> l = new ArrayList<Stage>();
- for (Stage s: stageList) {
+ for (Stage s : stageList) {
if (s.doesStageHaveHostRoleStatus(statuses)) {
l.add(s);
}
}
return l;
}
+
@Override
public synchronized List<Long> getRequests() {
Set<Long> requestIds = new HashSet<Long>();
- for (Stage s: stageList) {
+ for (Stage s : stageList) {
requestIds.add(s.getRequestId());
}
List<Long> ids = new ArrayList<Long>();
@@ -199,6 +205,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
}
return null;
}
+
@Override
public List<Long> getRequestsByStatus(RequestStatus status) {
// TODO
@@ -211,7 +218,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
for (Long requestId : requestIds) {
List<Stage> stages = getAllStages(requestId);
result.put(requestId, stages != null && !stages.isEmpty() ? stages.get
- (0).getRequestContext() : "");
+ (0).getRequestContext() : "");
}
return result;
}
@@ -220,6 +227,6 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
public String getRequestContext(long requestId) {
List<Stage> stages = getAllStages(requestId);
return stages != null && !stages.isEmpty() ? stages.get(0)
- .getRequestContext() : "";
+ .getRequestContext() : "";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index 6b32e73..aa553c4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -24,6 +24,7 @@ import com.google.inject.persist.UnitOfWork;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.serveraction.ServerActionManager;
import org.apache.ambari.server.state.Clusters;
@@ -73,12 +74,16 @@ public class ActionManager {
scheduler.stop();
}
- public void sendActions(List<Stage> stages) {
+ public void sendActions(List<Stage> stages, ExecuteActionRequest request) {
if (LOG.isDebugEnabled()) {
for (Stage s : stages) {
LOG.debug("Persisting stage into db: " + s.toString());
}
+
+ if (request != null) {
+ LOG.debug("In response to request: " + request.toString());
+ }
}
db.persistActions(stages);
@@ -91,7 +96,7 @@ public class ActionManager {
}
public Stage getAction(long requestId, long stageId) {
- return db.getAction(StageUtils.getActionId(requestId, stageId));
+ return db.getStage(StageUtils.getActionId(requestId, stageId));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 22ed44e..a5c5dc0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -205,8 +205,7 @@ class ActionScheduler implements Runnable {
scheduleHostRole(s, cmd);
} catch (InvalidStateTransitionException e) {
LOG.warn("Could not schedule host role " + cmd.toString(), e);
- db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(),
- Role.valueOf(cmd.getRole()));
+ db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(), cmd.getRole());
}
}
}
@@ -326,8 +325,7 @@ class ActionScheduler implements Runnable {
if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
LOG.warn("Host:" + host + ", role:" + roleStr + ", actionId:"
+ s.getActionId() + " expired");
- db.timeoutHostRole(host, s.getRequestId(), s.getStageId(),
- Role.valueOf(c.getRole()));
+ db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), c.getRole());
//Reinitialize status
status = s.getHostRoleStatus(host, roleStr);
ServiceComponentHostOpFailedEvent timeoutEvent =
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
index eb0cfa9..38bc371 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
@@ -98,7 +98,8 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
TargetHostType targetType, String serviceType, String componentType,
Short defaultTimeout)
throws AmbariException {
- validateCreateInput(actionName, actionType, inputs, description, defaultTimeout);
+ validateCreateInput(actionName, actionType, inputs, description, defaultTimeout,
+ targetType, serviceType, componentType);
ActionEntity entity =
actionDefinitionDAO.findByPK(actionName);
if (entity == null) {
@@ -178,7 +179,8 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
}
private void validateCreateInput(String actionName, ActionType actionType, String inputs,
- String description, Short defaultTimeout)
+ String description, Short defaultTimeout,
+ TargetHostType targetType, String serviceType, String componentType)
throws AmbariException {
validateActionName(actionName);
@@ -199,6 +201,12 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
throw new AmbariException("Action type cannot be " + actionType);
}
+ if (serviceType == null || serviceType.isEmpty()) {
+ if (componentType != null && !componentType.isEmpty()) {
+ throw new AmbariException("Target component cannot be specified unless target service is specified");
+ }
+ }
+
if (inputs != null && !inputs.isEmpty()) {
String[] parameters = inputs.split(",");
for (String parameter : parameters) {
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
index e3bed0c..21ec077 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
@@ -86,10 +86,6 @@ public class HostRoleCommand {
//make use of lazy loading
executionCommandDAO = injector.getInstance(ExecutionCommandDAO.class);
-// executionCommandWrapper = new ExecutionCommandWrapper(new String(
-// hostRoleCommandEntity
-// .getExecutionCommand().getCommand()
-// ));
}
HostRoleCommandEntity constructNewPersistenceEntity() {
@@ -106,10 +102,6 @@ public class HostRoleCommand {
hostRoleCommandEntity.setRoleCommand(roleCommand);
hostRoleCommandEntity.setEvent(event.getEventJson());
-// ExecutionCommandEntity executionCommandEntity = new ExecutionCommandEntity();
-// executionCommandEntity.setCommand(executionCommandWrapper.getJson().getBytes());
-// executionCommandEntity.setHostRoleCommand(hostRoleCommandEntity);
-// hostRoleCommandEntity.setExecutionCommand(executionCommandEntity);
return hostRoleCommandEntity;
}
@@ -120,7 +112,6 @@ public class HostRoleCommand {
return executionCommandEntity;
}
-
public long getTaskId() {
return taskId;
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
index 1b4ecdf..c72c14b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
@@ -54,7 +54,8 @@ public class ExecutionCommand extends AgentCommand {
private Map<String, Map<String, String>> configurationTags;
private Map<String, String> commandParams;
private String serviceName;
-
+ private String componentName;
+
@JsonProperty("commandId")
public String getCommandId() {
return this.commandId;
@@ -202,6 +203,16 @@ public class ExecutionCommand extends AgentCommand {
this.serviceName = serviceName;
}
+ @JsonProperty("componentName")
+ public String getComponentName() {
+ return componentName;
+ }
+
+ @JsonProperty("componentName")
+ public void setComponentName(String componentName) {
+ this.componentName = componentName;
+ }
+
/**
* @param configTags the config tag map
*/
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 60aede9..9da4d45 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -187,16 +187,19 @@ public class HeartBeatHandler {
return response;
}
- protected void processCommandReports(HeartBeat heartbeat,
- String hostname,
- Clusters clusterFsm, long now)
+ protected void processCommandReports(
+ HeartBeat heartbeat, String hostname, Clusters clusterFsm, long now)
throws AmbariException {
List<CommandReport> reports = heartbeat.getReports();
for (CommandReport report : reports) {
LOG.debug("Received command report: " + report);
+ if (RoleCommand.ACTIONEXECUTE.equals(report.getRoleCommand())) {
+ continue;
+ }
+
Cluster cl = clusterFsm.getCluster(report.getClusterName());
String service = report.getServiceName();
- if (service == null || "".equals(service)) {
+ if (service == null || service.isEmpty()) {
throw new AmbariException("Invalid command report, service: " + service);
}
if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
@@ -217,7 +220,7 @@ public class HeartBeatHandler {
&& null != report.getConfigurationTags()
&& !report.getConfigurationTags().isEmpty()) {
LOG.info("Updating applied config on service " + scHost.getServiceName() +
- ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
+ ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
scHost.updateActualConfigs(report.getConfigurationTags());
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
new file mode 100644
index 0000000..f1bea70
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.actionmanager.TargetHostType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The context required to create tasks and stages for a custom action
+ */
+public class ActionExecutionContext {
+ private final String clusterName;
+ private final String actionName;
+ private final String serviceName;
+ private final String componentName;
+ private final List<String> hosts;
+ private final Map<String, String> parameters;
+ private final TargetHostType targetType;
+ private final Short timeout;
+
+ /**
+ * Create an ActionExecutionContext to execute an action from a request
+ */
+ public ActionExecutionContext(String clusterName, String actionName, String serviceName,
+ String componentName, List<String> hosts, Map<String, String> parameters,
+ TargetHostType targetType, Short timeout) {
+ this.clusterName = clusterName;
+ this.actionName = actionName;
+ this.serviceName = serviceName;
+ this.componentName = componentName;
+ this.parameters = parameters;
+ this.hosts = new ArrayList<String>();
+ if (hosts != null) {
+ this.hosts.addAll(hosts);
+ }
+ this.targetType = targetType;
+ this.timeout = timeout;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getActionName() {
+ return actionName;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public String getComponentName() {
+ return componentName;
+ }
+
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+
+ public List<String> getHosts() {
+ return hosts;
+ }
+
+ public TargetHostType getTargetType() {
+ return targetType;
+ }
+
+ public Short getTimeout() {
+ return timeout;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
new file mode 100644
index 0000000..632b11d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.StackAccessException;
+import org.apache.ambari.server.actionmanager.ActionDefinition;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.TargetHostType;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.ComponentInfo;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceInfo;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Helper class containing logic to process custom action execution requests
+ */
+public class AmbariActionExecutionHelper {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(AmbariCustomCommandExecutionHelper.class);
+ private ActionMetadata actionMetadata;
+ private Clusters clusters;
+ private AmbariManagementControllerImpl amcImpl;
+ private ActionManager actionManager;
+ private AmbariMetaInfo ambariMetaInfo;
+
+ public AmbariActionExecutionHelper(ActionMetadata actionMetadata, Clusters clusters,
+ AmbariManagementControllerImpl amcImpl) {
+ this.amcImpl = amcImpl;
+ this.actionMetadata = actionMetadata;
+ this.clusters = clusters;
+ this.actionManager = amcImpl.getActionManager();
+ this.ambariMetaInfo = amcImpl.getAmbariMetaInfo();
+ }
+
+ /**
+ * Validates the request to execute an action
+ *
+ * @param actionRequest
+ * @param cluster
+ * @return
+ * @throws AmbariException
+ */
+ public ActionExecutionContext validateCustomAction(ExecuteActionRequest actionRequest, Cluster cluster)
+ throws AmbariException {
+ if (actionRequest.getActionName() == null || actionRequest.getActionName().isEmpty()) {
+ throw new AmbariException("Action name must be specified");
+ }
+
+ ActionDefinition actionDef = actionManager.getActionDefinition(actionRequest.getActionName());
+ if (actionDef == null) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " does not exist");
+ }
+
+ StackId stackId = cluster.getCurrentStackVersion();
+ String expectedService = actionDef.getTargetService() == null ? "" : actionDef.getTargetService();
+ String actualService = actionRequest.getServiceName() == null ? "" : actionRequest.getServiceName();
+ if (!expectedService.isEmpty() && !actualService.isEmpty() && !expectedService.equals(actualService)) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " targets service " + actualService +
+ " that does not match with expected " + expectedService);
+ }
+
+ String targetService = expectedService;
+ if (targetService == null || targetService.isEmpty()) {
+ targetService = actualService;
+ }
+
+ if (targetService != null && !targetService.isEmpty()) {
+ ServiceInfo serviceInfo;
+ try {
+ serviceInfo = ambariMetaInfo.getService(stackId.getStackName(), stackId.getStackVersion(),
+ targetService);
+ } catch (StackAccessException se) {
+ serviceInfo = null;
+ }
+
+ if (serviceInfo == null) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " targets service " + targetService +
+ " that does not exist.");
+ }
+ }
+
+ String expectedComponent = actionDef.getTargetComponent() == null ? "" : actionDef.getTargetComponent();
+ String actualComponent = actionRequest.getComponentName() == null ? "" : actionRequest.getComponentName();
+ if (!expectedComponent.isEmpty() && !actualComponent.isEmpty() && !expectedComponent.equals(actualComponent)) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + actualComponent +
+ " that does not match with expected " + expectedComponent);
+ }
+
+ String targetComponent = expectedComponent;
+ if (targetComponent == null || targetComponent.isEmpty()) {
+ targetComponent = actualComponent;
+ }
+
+ if (!targetComponent.isEmpty() && targetService.isEmpty()) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + targetComponent +
+ " without specifying the target service.");
+ }
+
+ if (targetComponent != null && !targetComponent.isEmpty()) {
+ ComponentInfo compInfo;
+ try {
+ compInfo = ambariMetaInfo.getComponent(stackId.getStackName(), stackId.getStackVersion(),
+ targetService, targetComponent);
+ } catch (StackAccessException se) {
+ compInfo = null;
+ }
+
+ if (compInfo == null) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + targetComponent +
+ " that does not exist.");
+ }
+ }
+
+ if (actionDef.getInputs() != null) {
+ String[] inputs = actionDef.getInputs().split(",");
+ for (String input : inputs) {
+ if (!input.trim().isEmpty() && !actionRequest.getParameters().containsKey(input.trim())) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " requires input '" +
+ input.trim() + "' that is not provided.");
+ }
+ }
+ }
+
+ if (actionDef.getTargetType() == TargetHostType.SPECIFIC
+ || (targetService.isEmpty() && targetService.isEmpty())) {
+ if (actionRequest.getHosts().size() == 0) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " requires explicit target host(s)" +
+ " that is not provided.");
+ }
+ }
+
+ LOG.info("Received action execution request"
+ + ", clusterName=" + actionRequest.getClusterName()
+ + ", request=" + actionRequest.toString());
+
+ ActionExecutionContext actionExecutionContext = new ActionExecutionContext(
+ actionRequest.getClusterName(), actionRequest.getActionName(), targetService, targetComponent,
+ actionRequest.getHosts(), actionRequest.getParameters(), actionDef.getTargetType(),
+ actionDef.getDefaultTimeout());
+
+ return actionExecutionContext;
+ }
+
+ /**
+ * Add tasks to the stage based on the requested action execution
+ *
+ * @param actionContext the context associated with the action
+ * @param stage stage into which tasks must be inserted
+ * @param configuration
+ * @param hostsMap
+ * @param hostLevelParams
+ * @throws AmbariException
+ */
+ public void addAction(ActionExecutionContext actionContext, Stage stage,
+ Configuration configuration, HostsMap hostsMap, Map<String, String> hostLevelParams)
+ throws AmbariException {
+ String actionName = actionContext.getActionName();
+ String clusterName = actionContext.getClusterName();
+ String serviceName = actionContext.getServiceName();
+ String componentName = actionContext.getComponentName();
+
+ // List of host to select from
+ Set<String> candidateHosts = new HashSet<String>();
+ if (!serviceName.isEmpty()) {
+ if (!componentName.isEmpty()) {
+ Map<String, ServiceComponentHost> componentHosts =
+ clusters.getCluster(clusterName).getService(serviceName)
+ .getServiceComponent(componentName).getServiceComponentHosts();
+ candidateHosts.addAll(componentHosts.keySet());
+ } else {
+ for (String component : clusters.getCluster(clusterName).getService(serviceName)
+ .getServiceComponents().keySet()) {
+ Map<String, ServiceComponentHost> componentHosts =
+ clusters.getCluster(clusterName).getService(serviceName)
+ .getServiceComponent(component).getServiceComponentHosts();
+ candidateHosts.addAll(componentHosts.keySet());
+ }
+ }
+ } else {
+ // All hosts are valid target host
+ candidateHosts.addAll(amcImpl.getClusters().getHostsForCluster(clusterName).keySet());
+ }
+
+ // If request did not specify hosts and there exists no host
+ if (actionContext.getHosts().isEmpty() && candidateHosts.isEmpty()) {
+ throw new AmbariException("Suitable hosts not found, component="
+ + componentName + ", service=" + serviceName
+ + ", cluster=" + clusterName + ", actionName=" + actionName);
+ }
+
+ // Compare specified hosts to available hosts
+ if (!actionContext.getHosts().isEmpty() && !candidateHosts.isEmpty()) {
+ for (String hostname : actionContext.getHosts()) {
+ if (!candidateHosts.contains(hostname)) {
+ throw new AmbariException("Request specifies host " + hostname + " but its not a valid host based on the " +
+ "target service=" + serviceName + " and component=" + componentName);
+ }
+ }
+ }
+
+ //Find target hosts to execute
+ if (actionContext.getHosts().isEmpty()) {
+ TargetHostType hostType = actionContext.getTargetType();
+ switch (hostType) {
+ case ALL:
+ actionContext.getHosts().addAll(candidateHosts);
+ break;
+ case ANY:
+ actionContext.getHosts().add(amcImpl.getHealthyHost(candidateHosts));
+ break;
+ case MAJORITY:
+ for (int i = 0; i < (candidateHosts.size() / 2) + 1; i++) {
+ String hostname = amcImpl.getHealthyHost(candidateHosts);
+ actionContext.getHosts().add(hostname);
+ candidateHosts.remove(hostname);
+ }
+ break;
+ default:
+ throw new AmbariException("Unsupported target type=" + hostType);
+ }
+ }
+
+ //create tasks for each host
+ for (String hostName : actionContext.getHosts()) {
+ stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionContext.getActionName()), RoleCommand.ACTIONEXECUTE,
+ new ServiceComponentHostOpInProgressEvent(actionContext.getActionName(), hostName,
+ System.currentTimeMillis()), clusterName, actionContext.getServiceName());
+
+ stage.getExecutionCommandWrapper(hostName, actionContext.getActionName()).getExecutionCommand()
+ .setRoleParams(actionContext.getParameters());
+
+ Cluster cluster = clusters.getCluster(clusterName);
+
+ Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
+ Map<String, Map<String, String>> configTags = null;
+ if (!actionContext.getServiceName().isEmpty()) {
+ configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, hostName);
+ }
+
+ ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
+ actionContext.getActionName()).getExecutionCommand();
+
+ execCmd.setConfigurations(configurations);
+ execCmd.setConfigurationTags(configTags);
+ execCmd.setHostLevelParams(hostLevelParams);
+ execCmd.setCommandParams(actionContext.getParameters());
+ execCmd.setServiceName(serviceName);
+ execCmd.setComponentName(componentName);
+
+ // Generate cluster host info
+ execCmd.setClusterHostInfo(
+ StageUtils.getClusterHostInfo(clusters.getHostsForCluster(clusterName), cluster, hostsMap, configuration));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
new file mode 100644
index 0000000..fa7522b
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Helper class containing logic to process custom command execution requests
+ */
+public class AmbariCustomCommandExecutionHelper {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(AmbariCustomCommandExecutionHelper.class);
+ private ActionMetadata actionMetadata;
+ private Clusters clusters;
+ private AmbariManagementControllerImpl amcImpl;
+
+ public AmbariCustomCommandExecutionHelper(ActionMetadata actionMetadata, Clusters clusters,
+ AmbariManagementControllerImpl amcImpl) {
+ this.amcImpl = amcImpl;
+ this.actionMetadata = actionMetadata;
+ this.clusters = clusters;
+ }
+
+ public void validateCustomCommand(ExecuteActionRequest actionRequest) throws AmbariException {
+ if (actionRequest.getServiceName() == null
+ || actionRequest.getServiceName().isEmpty()
+ || actionRequest.getCommandName() == null
+ || actionRequest.getCommandName().isEmpty()) {
+ throw new AmbariException("Invalid request : " + "cluster="
+ + actionRequest.getClusterName() + ", service="
+ + actionRequest.getServiceName() + ", command="
+ + actionRequest.getCommandName());
+ }
+
+ LOG.info("Received a command execution request"
+ + ", clusterName=" + actionRequest.getClusterName()
+ + ", serviceName=" + actionRequest.getServiceName()
+ + ", request=" + actionRequest.toString());
+
+ if (!isValidCommand(actionRequest.getCommandName(), actionRequest.getServiceName())) {
+ throw new AmbariException(
+ "Unsupported action " + actionRequest.getCommandName() + " for " + actionRequest.getServiceName());
+ }
+ }
+
+ private Boolean isValidCommand(String command, String service) {
+ List<String> actions = actionMetadata.getActions(service);
+ if (actions == null || actions.size() == 0) {
+ return false;
+ }
+
+ if (!actions.contains(command)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public void addAction(ExecuteActionRequest actionRequest, Stage stage,
+ Configuration configuration, HostsMap hostsMap, Map<String, String> hostLevelParams)
+ throws AmbariException {
+ if (actionRequest.getCommandName().contains("SERVICE_CHECK")) {
+ addServiceCheckAction(actionRequest, stage, configuration, hostsMap, hostLevelParams);
+ } else if (actionRequest.getCommandName().equals("DECOMMISSION_DATANODE")) {
+ addDecommissionDatanodeAction(actionRequest, stage, hostLevelParams);
+ } else {
+ throw new AmbariException("Unsupported action " + actionRequest.getCommandName());
+ }
+ }
+
+ private void addServiceCheckAction(ExecuteActionRequest actionRequest, Stage stage,
+ Configuration configuration, HostsMap hostsMap,
+ Map<String, String> hostLevelParams)
+ throws AmbariException {
+ String clusterName = actionRequest.getClusterName();
+ String componentName = actionMetadata.getClient(actionRequest
+ .getServiceName());
+
+ String hostName;
+ if (componentName != null) {
+ Map<String, ServiceComponentHost> components = clusters
+ .getCluster(clusterName).getService(actionRequest.getServiceName())
+ .getServiceComponent(componentName).getServiceComponentHosts();
+
+ if (components.isEmpty()) {
+ throw new AmbariException("Hosts not found, component="
+ + componentName + ", service=" + actionRequest.getServiceName()
+ + ", cluster=" + clusterName);
+ }
+ hostName = amcImpl.getHealthyHost(components.keySet());
+ } else {
+ Map<String, ServiceComponent> components = clusters
+ .getCluster(clusterName).getService(actionRequest.getServiceName())
+ .getServiceComponents();
+
+ if (components.isEmpty()) {
+ throw new AmbariException("Components not found, service="
+ + actionRequest.getServiceName() + ", cluster=" + clusterName);
+ }
+
+ ServiceComponent serviceComponent = components.values().iterator()
+ .next();
+
+ if (serviceComponent.getServiceComponentHosts().isEmpty()) {
+ throw new AmbariException("Hosts not found, component="
+ + serviceComponent.getName() + ", service="
+ + actionRequest.getServiceName() + ", cluster=" + clusterName);
+ }
+
+ hostName = serviceComponent.getServiceComponentHosts().keySet()
+ .iterator().next();
+ }
+
+ stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionRequest
+ .getCommandName()), RoleCommand.EXECUTE,
+ new ServiceComponentHostOpInProgressEvent(componentName, hostName,
+ System.currentTimeMillis()), clusterName, actionRequest
+ .getServiceName());
+
+ stage.getExecutionCommandWrapper(hostName, actionRequest.getCommandName()).getExecutionCommand()
+ .setRoleParams(actionRequest.getParameters());
+
+ Cluster cluster = clusters.getCluster(clusterName);
+
+ // [ type -> [ key, value ] ]
+ Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
+ Map<String, Map<String, String>> configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, hostName);
+
+ ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
+ actionRequest.getCommandName()).getExecutionCommand();
+
+ execCmd.setConfigurations(configurations);
+ execCmd.setConfigurationTags(configTags);
+ execCmd.setHostLevelParams(hostLevelParams);
+
+ // Generate cluster host info
+ execCmd.setClusterHostInfo(
+ StageUtils.getClusterHostInfo(clusters.getHostsForCluster(clusterName), cluster, hostsMap, configuration));
+ }
+
+ private void addDecommissionDatanodeAction(ExecuteActionRequest decommissionRequest, Stage stage,
+ Map<String, String> hostLevelParams)
+ throws AmbariException {
+ String hdfsExcludeFileType = "hdfs-exclude-file";
+ // Find hdfs admin host, just decommission from namenode.
+ String clusterName = decommissionRequest.getClusterName();
+ Cluster cluster = clusters.getCluster(clusterName);
+ String serviceName = decommissionRequest.getServiceName();
+ String namenodeHost = clusters.getCluster(clusterName)
+ .getService(serviceName).getServiceComponent(Role.NAMENODE.toString())
+ .getServiceComponentHosts().keySet().iterator().next();
+
+ String excludeFileTag = null;
+ if (decommissionRequest.getParameters() != null
+ && (decommissionRequest.getParameters().get("excludeFileTag") != null)) {
+ excludeFileTag = decommissionRequest.getParameters()
+ .get("excludeFileTag");
+ }
+
+ if (excludeFileTag == null) {
+ throw new AmbariException("No exclude file specified"
+ + " when decommissioning datanodes. Provide parameter excludeFileTag with the tag for config type "
+ + hdfsExcludeFileType);
+ }
+
+ Config config = clusters.getCluster(clusterName).getConfig(
+ hdfsExcludeFileType, excludeFileTag);
+ if (config == null) {
+ throw new AmbariException("Decommissioning datanodes requires the cluster to be associated with config type " +
+ hdfsExcludeFileType + " with a list of datanodes to be decommissioned (\"datanodes\" : list).");
+ }
+
+ LOG.info("Decommissioning data nodes: " + config.getProperties().get("datanodes") +
+ " " + hdfsExcludeFileType + " tag: " + excludeFileTag);
+
+ Map<String, Map<String, String>> configurations =
+ new TreeMap<String, Map<String, String>>();
+
+
+ Map<String, Map<String, String>> configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, namenodeHost);
+
+ // Add the tag for hdfs-exclude-file
+ Map<String, String> excludeTags = new HashMap<String, String>();
+ excludeTags.put(ConfigHelper.CLUSTER_DEFAULT_TAG, config.getVersionTag());
+ configTags.put(hdfsExcludeFileType, excludeTags);
+
+ stage.addHostRoleExecutionCommand(
+ namenodeHost,
+ Role.DECOMMISSION_DATANODE,
+ RoleCommand.EXECUTE,
+ new ServiceComponentHostOpInProgressEvent(Role.DECOMMISSION_DATANODE
+ .toString(), namenodeHost, System.currentTimeMillis()),
+ clusterName, serviceName);
+
+ ExecutionCommand execCmd = stage.getExecutionCommandWrapper(namenodeHost,
+ Role.DECOMMISSION_DATANODE.toString()).getExecutionCommand();
+
+ execCmd.setConfigurations(configurations);
+ execCmd.setConfigurationTags(configTags);
+ execCmd.setHostLevelParams(hostLevelParams);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index ef5c372..890cb81 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -165,7 +165,10 @@ public class AmbariManagementControllerImpl implements
final private String ojdbcUrl;
final private String serverDB;
final private String mysqljdbcUrl;
-
+
+ final private AmbariCustomCommandExecutionHelper customCommandExecutionHelper;
+ final private AmbariActionExecutionHelper actionExecutionHelper;
+
@Inject
public AmbariManagementControllerImpl(ActionManager actionManager,
Clusters clusters, Injector injector) throws Exception {
@@ -200,6 +203,11 @@ public class AmbariManagementControllerImpl implements
this.mysqljdbcUrl = null;
this.serverDB = null;
}
+
+ this.customCommandExecutionHelper = new AmbariCustomCommandExecutionHelper(
+ this.actionMetadata, this.clusters, this);
+ this.actionExecutionHelper = new AmbariActionExecutionHelper(
+ this.actionMetadata, this.clusters, this);
}
public String getAmbariServerURI(String path) {
@@ -1090,7 +1098,7 @@ public class AmbariManagementControllerImpl implements
* @return
* @throws AmbariException
*/
- private Map<String, Map<String,String>> findConfigurationTagsWithOverrides(
+ protected Map<String, Map<String,String>> findConfigurationTagsWithOverrides(
Cluster cluster, String hostName) throws AmbariException {
Map<String, Map<String,String>> configTags =
@@ -1099,7 +1107,6 @@ public class AmbariManagementControllerImpl implements
return configTags;
}
-
private List<Stage> doStageCreation(Cluster cluster,
Map<State, List<Service>> changedServices,
Map<State, List<ServiceComponent>> changedComps,
@@ -1142,7 +1149,7 @@ public class AmbariManagementControllerImpl implements
// multiple stages may be needed for reconfigure
long stageId = 0;
Map<String, List<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
- clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector);
+ clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector.getInstance(Configuration.class));
String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);
@@ -1353,7 +1360,8 @@ public class AmbariManagementControllerImpl implements
stage.getExecutionCommandWrapper(clientHost, smokeTestRole)
.getExecutionCommand()
.setClusterHostInfo(StageUtils.getClusterHostInfo(
- clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector));
+ clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap,
+ injector.getInstance(Configuration.class)));
Map<String,String> hostParams = new HashMap<String, String>();
hostParams.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
@@ -1381,7 +1389,7 @@ public class AmbariManagementControllerImpl implements
+ ", requestId=" + stages.get(0).getRequestId()
+ ", stagesCount=" + stages.size());
}
- actionManager.sendActions(stages);
+ actionManager.sendActions(stages, null);
}
}
@@ -1472,7 +1480,6 @@ public class AmbariManagementControllerImpl implements
Map<String, Map<String, Map<String, Set<String>>>> hostComponentNames =
new HashMap<String, Map<String, Map<String, Set<String>>>>();
Set<State> seenNewStates = new HashSet<State>();
- boolean processingUpgradeRequest = false;
int numberOfRequestsProcessed = 0;
StackId fromStackVersion = new StackId();
Map<ServiceComponentHost, State> directTransitionScHosts = new HashMap<ServiceComponentHost, State>();
@@ -1541,9 +1548,6 @@ public class AmbariManagementControllerImpl implements
}
}
- // If upgrade request comes without state information then its an error
- boolean upgradeRequest = checkIfUpgradeRequestAndValidate(request, cluster, s, sc, sch);
-
if (newState == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Nothing to do for new updateServiceComponentHost request"
@@ -1565,25 +1569,6 @@ public class AmbariManagementControllerImpl implements
seenNewStates.add(newState);
- if (!processingUpgradeRequest && upgradeRequest) {
- processingUpgradeRequest = true;
- // this needs to be the first request
- if (numberOfRequestsProcessed > 1) {
- throw new AmbariException("An upgrade request cannot be combined with " +
- "other non-upgrade requests.");
- }
- fromStackVersion = sch.getStackVersion();
- }
-
- if (processingUpgradeRequest) {
- if (!upgradeRequest) {
- throw new AmbariException("An upgrade request cannot be combined with " +
- "other non-upgrade requests.");
- }
- sch.setState(State.UPGRADING);
- sch.setDesiredStackVersion(cluster.getCurrentStackVersion());
- }
-
State oldSchState = sch.getState();
// Client component reinstall allowed
if (newState == oldSchState && !sc.isClientComponent()) {
@@ -1613,20 +1598,6 @@ public class AmbariManagementControllerImpl implements
}
if (isDirectTransition(oldSchState, newState)) {
-
-// if (newState == State.DELETED) {
-// if (!sch.canBeRemoved()) {
-// throw new AmbariException("Servicecomponenthost cannot be removed"
-// + ", clusterName=" + cluster.getClusterName()
-// + ", clusterId=" + cluster.getClusterId()
-// + ", serviceName=" + sch.getServiceName()
-// + ", componentName=" + sch.getServiceComponentName()
-// + ", hostname=" + sch.getHostName()
-// + ", currentState=" + oldSchState
-// + ", newDesiredState=" + newState);
-// }
-// }
-
if (LOG.isDebugEnabled()) {
LOG.debug("Handling direct transition update to ServiceComponentHost"
+ ", clusterName=" + request.getClusterName()
@@ -1698,13 +1669,8 @@ public class AmbariManagementControllerImpl implements
Cluster cluster = clusters.getCluster(clusterNames.iterator().next());
- Map<String, String> requestParameters = null;
- if (processingUpgradeRequest) {
- requestParameters = new HashMap<String, String>();
- requestParameters.put(Configuration.UPGRADE_TO_STACK, gson.toJson(cluster.getCurrentStackVersion()));
- requestParameters.put(Configuration.UPGRADE_FROM_STACK, gson.toJson(fromStackVersion));
- }
- return createStages(cluster, requestProperties, requestParameters, null, null, changedScHosts, ignoredScHosts, runSmokeTest, false);
+ return createStages(cluster, requestProperties, null, null, null, changedScHosts, ignoredScHosts, runSmokeTest,
+ false);
}
private void validateServiceComponentHostRequest(ServiceComponentHostRequest request) {
@@ -2187,7 +2153,7 @@ public class AmbariManagementControllerImpl implements
return null;
}
- private String getHealthyHost(Set<String> hostList) throws AmbariException {
+ protected String getHealthyHost(Set<String> hostList) throws AmbariException {
// Return a healthy host if found otherwise any random host
String hostName = null;
for (String candidateHostName : hostList) {
@@ -2200,149 +2166,11 @@ public class AmbariManagementControllerImpl implements
return hostName;
}
- private void addServiceCheckAction(ExecuteActionRequest actionRequest, Stage stage)
- throws AmbariException {
- String clusterName = actionRequest.getClusterName();
- String componentName = actionMetadata.getClient(actionRequest
- .getServiceName());
-
- String hostName;
- if (componentName != null) {
- Map<String, ServiceComponentHost> components = clusters
- .getCluster(clusterName).getService(actionRequest.getServiceName())
- .getServiceComponent(componentName).getServiceComponentHosts();
-
- if (components.isEmpty()) {
- throw new AmbariException("Hosts not found, component="
- + componentName + ", service=" + actionRequest.getServiceName()
- + ", cluster=" + clusterName);
- }
- hostName = getHealthyHost(components.keySet());
- } else {
- Map<String, ServiceComponent> components = clusters
- .getCluster(clusterName).getService(actionRequest.getServiceName())
- .getServiceComponents();
-
- if (components.isEmpty()) {
- throw new AmbariException("Components not found, service="
- + actionRequest.getServiceName() + ", cluster=" + clusterName);
- }
-
- ServiceComponent serviceComponent = components.values().iterator()
- .next();
-
- if (serviceComponent.getServiceComponentHosts().isEmpty()) {
- throw new AmbariException("Hosts not found, component="
- + serviceComponent.getName() + ", service="
- + actionRequest.getServiceName() + ", cluster=" + clusterName);
- }
-
- hostName = serviceComponent.getServiceComponentHosts().keySet()
- .iterator().next();
- }
-
- stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionRequest
- .getCommandName()), RoleCommand.EXECUTE,
- new ServiceComponentHostOpInProgressEvent(componentName, hostName,
- System.currentTimeMillis()), clusterName, actionRequest
- .getServiceName());
-
- stage.getExecutionCommandWrapper(hostName, actionRequest.getCommandName()).getExecutionCommand()
- .setRoleParams(actionRequest.getParameters());
-
- Cluster cluster = clusters.getCluster(clusterName);
-
- // [ type -> [ key, value ] ]
- Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String,String>>();
- Map<String, Map<String, String>> configTags =
- findConfigurationTagsWithOverrides(cluster, hostName);
-
- ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
- actionRequest.getCommandName()).getExecutionCommand();
-
- execCmd.setConfigurations(configurations);
- execCmd.setConfigurationTags(configTags);
-
- Map<String, String> params = new TreeMap<String, String>();
- params.put("jdk_location", this.jdkResourceUrl);
- params.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
- execCmd.setHostLevelParams(params);
- }
-
- private void addDecommissionDatanodeAction(
- ExecuteActionRequest decommissionRequest, Stage stage)
- throws AmbariException {
- String hdfsExcludeFileType = "hdfs-exclude-file";
- // Find hdfs admin host, just decommission from namenode.
- String clusterName = decommissionRequest.getClusterName();
- Cluster cluster = clusters.getCluster(clusterName);
- String serviceName = decommissionRequest.getServiceName();
- String namenodeHost = clusters.getCluster(clusterName)
- .getService(serviceName).getServiceComponent(Role.NAMENODE.toString())
- .getServiceComponentHosts().keySet().iterator().next();
-
- String excludeFileTag = null;
- if (decommissionRequest.getParameters() != null
- && (decommissionRequest.getParameters().get("excludeFileTag") != null)) {
- excludeFileTag = decommissionRequest.getParameters()
- .get("excludeFileTag");
- }
-
- if (excludeFileTag == null) {
- throw new IllegalArgumentException("No exclude file specified"
- + " when decommissioning datanodes. Provide parameter excludeFileTag with the tag for config type "
- + hdfsExcludeFileType);
- }
-
- Config config = clusters.getCluster(clusterName).getConfig(
- hdfsExcludeFileType, excludeFileTag);
- if(config == null){
- throw new AmbariException("Decommissioning datanodes requires the cluster to be associated with config type " +
- hdfsExcludeFileType + " with a list of datanodes to be decommissioned (\"datanodes\" : list).");
- }
-
- LOG.info("Decommissioning data nodes: " + config.getProperties().get("datanodes") +
- " " + hdfsExcludeFileType + " tag: " + excludeFileTag);
-
- Map<String, Map<String, String>> configurations =
- new TreeMap<String, Map<String, String>>();
-
-
- Map<String, Map<String, String>> configTags =
- findConfigurationTagsWithOverrides(cluster, namenodeHost);
-
- // Add the tag for hdfs-exclude-file
- Map<String, String> excludeTags = new HashMap<String, String>();
- excludeTags.put(ConfigHelper.CLUSTER_DEFAULT_TAG, config.getVersionTag());
- configTags.put(hdfsExcludeFileType, excludeTags);
-
- stage.addHostRoleExecutionCommand(
- namenodeHost,
- Role.DECOMMISSION_DATANODE,
- RoleCommand.EXECUTE,
- new ServiceComponentHostOpInProgressEvent(Role.DECOMMISSION_DATANODE
- .toString(), namenodeHost, System.currentTimeMillis()),
- clusterName, serviceName);
-
- ExecutionCommand execCmd = stage.getExecutionCommandWrapper(namenodeHost,
- Role.DECOMMISSION_DATANODE.toString()).getExecutionCommand();
-
- execCmd.setConfigurations(configurations);
- execCmd.setConfigurationTags(configTags);
-
- Map<String, String> params = new TreeMap<String, String>();
- params.put("jdk_location", this.jdkResourceUrl);
- params.put("stack_version", cluster.getDesiredStackVersion()
- .getStackVersion());
- execCmd.setHostLevelParams(params);
-
- }
-
@Override
public RequestStatusResponse createAction(ExecuteActionRequest actionRequest, Map<String, String> requestProperties)
throws AmbariException {
- String clusterName = null;
-
+ String clusterName;
+ Configuration configuration = injector.getInstance(Configuration.class);
String requestContext = "";
if (requestProperties != null) {
@@ -2353,48 +2181,38 @@ public class AmbariManagementControllerImpl implements
}
}
- String logDir = ""; //TODO empty for now
-
if (actionRequest.getClusterName() == null
- || actionRequest.getClusterName().isEmpty()
- || actionRequest.getServiceName() == null
- || actionRequest.getServiceName().isEmpty()
- || actionRequest.getCommandName() == null
- || actionRequest.getCommandName().isEmpty()) {
- throw new AmbariException("Invalid action request : " + "cluster="
- + actionRequest.getClusterName() + ", service="
- + actionRequest.getServiceName() + ", command="
- + actionRequest.getCommandName());
+ || actionRequest.getClusterName().isEmpty()) {
+ throw new AmbariException("Invalid request, cluster name must be specified");
}
-
clusterName = actionRequest.getClusterName();
-
+
Cluster cluster = clusters.getCluster(clusterName);
-
- Map<String, List<String>> clusterHostInfoMap = StageUtils.getClusterHostInfo(clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector);
- String clusterHostInfo = StageUtils.getGson().toJson(clusterHostInfoMap);
-
- Stage stage = stageFactory.createNew(actionManager.getNextRequestId(),
- logDir, clusterName, requestContext, clusterHostInfo);
+ ActionExecutionContext actionExecContext = null;
+ if (actionRequest.isCommand()) {
+ customCommandExecutionHelper.validateCustomCommand(actionRequest);
+ } else {
+ actionExecContext = actionExecutionHelper.validateCustomAction(actionRequest, cluster);
+ }
+
+ Map<String, List<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
+ clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap,
+ injector.getInstance(Configuration.class));
+
+ String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);
+ Stage stage = createNewStage(cluster, actionManager.getNextRequestId(), requestContext, clusterHostInfoJson);
stage.setStageId(0);
- LOG.info("Received a createAction request"
- + ", clusterName=" + actionRequest.getClusterName()
- + ", serviceName=" + actionRequest.getServiceName()
- + ", request=" + actionRequest.toString());
- if (!isValidCommand(actionRequest.getCommandName(), actionRequest.getServiceName())) {
- throw new AmbariException(
- "Unsupported action " + actionRequest.getCommandName() + " for " + actionRequest.getServiceName());
- }
+ Map<String, String> params = new TreeMap<String, String>();
+ params.put("jdk_location", this.jdkResourceUrl);
+ params.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
- if (actionRequest.getCommandName().contains("SERVICE_CHECK")) {
- addServiceCheckAction(actionRequest, stage);
- } else if (actionRequest.getCommandName().equals("DECOMMISSION_DATANODE")) {
- addDecommissionDatanodeAction(actionRequest, stage);
+ if (actionRequest.isCommand()) {
+ customCommandExecutionHelper.addAction(actionRequest, stage, configuration, hostsMap, params);
} else {
- throw new AmbariException("Unsupported action " + actionRequest.getCommandName());
+ actionExecutionHelper.addAction(actionExecContext, stage, configuration, hostsMap, params);
}
RoleCommandOrder rco = this.getRoleCommandOrder(cluster);
@@ -2402,7 +2220,7 @@ public class AmbariManagementControllerImpl implements
rg.build(stage);
List<Stage> stages = rg.getStages();
if (stages != null && !stages.isEmpty()) {
- actionManager.sendActions(stages);
+ actionManager.sendActions(stages, actionRequest);
return getRequestStatusResponse(stage.getRequestId());
} else {
throw new AmbariException("Stage was not created");
@@ -2428,18 +2246,6 @@ public class AmbariManagementControllerImpl implements
}
- private Boolean isValidCommand(String command, String service) {
- List<String> actions = actionMetadata.getActions(service);
- if (actions == null || actions.size() == 0) {
- return false;
- }
-
- if (!actions.contains(command)) {
- return false;
- }
-
- return true;
- }
private Set<StackResponse> getStacks(StackRequest request)
throws AmbariException {
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
index fd1cd1f..f8dd908 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
@@ -17,6 +17,10 @@
*/
package org.apache.ambari.server.controller;
+import org.apache.ambari.server.utils.StageUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,26 +37,29 @@ public class ExecuteActionRequest {
private Map<String, String> parameters;
public ExecuteActionRequest(String clusterName, String commandName,
- String actionName, String serviceName, String componentName,
- List<String> hosts, Map<String, String> parameters) {
- this.clusterName = clusterName;
- this.commandName = commandName;
+ String actionName, String serviceName, String componentName,
+ List<String> hosts, Map<String, String> parameters) {
+ this(clusterName, commandName, serviceName, parameters);
this.actionName = actionName;
- this.serviceName = serviceName;
this.componentName = componentName;
- this.parameters = parameters;
- this.hosts = hosts;
+ if (hosts != null) {
+ this.hosts.addAll(hosts);
+ }
}
/**
* Create an ExecuteActionRequest to execute a command
*/
public ExecuteActionRequest(String clusterName, String commandName, String serviceName,
- Map<String, String> parameters) {
+ Map<String, String> parameters) {
this.clusterName = clusterName;
this.commandName = commandName;
this.serviceName = serviceName;
- this.parameters = parameters;
+ this.parameters = new HashMap<String, String>();
+ if (parameters != null) {
+ this.parameters.putAll(parameters);
+ }
+ this.hosts = new ArrayList<String>();
}
public String getClusterName() {
@@ -86,4 +93,17 @@ public class ExecuteActionRequest {
public Boolean isCommand() {
return actionName == null || actionName.isEmpty();
}
+
+ @Override
+ public synchronized String toString() {
+ return (new StringBuilder()).
+ append("isCommand :" + isCommand().toString()).
+ append(", action :" + actionName).
+ append(", command :" + commandName).
+ append(", inputs :" + parameters.toString()).
+ append(", targetService :" + serviceName).
+ append(", targetComponent :" + componentName).
+ append(", targetHosts :" + hosts.toString()).
+ append(", clusterName :" + clusterName).toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index f77c8a4..5678887 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -129,14 +129,14 @@ public class HostRoleCommandDAO {
}
@Transactional
- public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, Role role) {
+ public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, String role) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT command " +
"FROM HostRoleCommandEntity command " +
"WHERE command.hostName=?1 AND command.requestId=?2 " +
"AND command.stageId=?3 AND command.role=?4 " +
"ORDER BY command.taskId", HostRoleCommandEntity.class);
- return daoUtils.selectList(query, hostName, requestId, stageId, role.name());
+ return daoUtils.selectList(query, hostName, requestId, stageId, role);
}
@Transactional
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index 8271151..79c001a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -77,10 +77,6 @@ public class StageDAO {
@Transactional
public List<StageEntity> findByCommandStatuses(Collection<HostRoleStatus> statuses) {
-// TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage " +
-// "FROM StageEntity stage JOIN stage.hostRoleCommands command " +
-// "WHERE command.status IN ?1 " +
-// "ORDER BY stage.requestId, stage.stageId", StageEntity.class);
TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage " +
"FROM StageEntity stage WHERE stage.stageId IN (SELECT hrce.stageId FROM " +
"HostRoleCommandEntity hrce WHERE stage.requestId = hrce.requestId and hrce.status IN ?1 ) " +
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
index 9e23516..7f1d031 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
@@ -21,7 +21,15 @@ package org.apache.ambari.server.orm.entities;
import org.apache.ambari.server.actionmanager.ActionType;
import org.apache.ambari.server.actionmanager.TargetHostType;
-import javax.persistence.*;
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
@NamedQueries({
@NamedQuery(name = "allActions", query =
@@ -52,11 +60,11 @@ public class ActionEntity {
@Basic
private String targetComponent;
- @Column(name = "description")
+ @Column(name = "description", nullable = false)
@Basic
private String description = "";
- @Column(name = "target_type")
+ @Column(name = "target_type", nullable = false)
@Enumerated(EnumType.STRING)
private TargetHostType targetType = TargetHostType.ANY;
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index b74a685..b922293 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -363,9 +363,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
State.INSTALLED,
ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
new ServiceComponentHostOpCompletedTransition())
-
-
.addTransition(State.INSTALLING,
State.INSTALLING,
ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,