You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2013/11/14 07:30:52 UTC

[1/2] AMBARI 3731. Custom Action: Add support for custom action execution

Updated Branches:
  refs/heads/trunk 708e59d9b -> 22f5fdfb7


http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
index d6523b0..a9c73bd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
@@ -188,7 +188,7 @@ public class StageUtils {
 
   public static Map<String, List<String>> getClusterHostInfo(
       Map<String, Host> allHosts, Cluster cluster, HostsMap hostsMap,
-      Injector injector) throws AmbariException {
+      Configuration configuration) throws AmbariException {
     Map<String, List<String>> info = new HashMap<String, List<String>>();
     if (cluster.getServices() != null) {
       String hostName = getHostName();
@@ -213,8 +213,7 @@ public class StageUtils {
               info.put(clusterInfoKey, hostList);
             }
             //Set up ambari-rca connection properties, is this a hack?
-//            info.put("ambari_db_server_host", Arrays.asList(hostsMap.getHostMap(getHostName())));
-            Configuration configuration = injector.getInstance(Configuration.class);
+            //info.put("ambari_db_server_host", Arrays.asList(hostsMap.getHostMap(getHostName())));
             String url = configuration.getRcaDatabaseUrl();
             if (url.contains(Configuration.HOSTNAME_MACRO)) {
               url = url.replace(Configuration.HOSTNAME_MACRO, hostsMap.getHostMap(hostName));

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql b/ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql
index c016d96..96d5956 100644
--- a/ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql
+++ b/ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql
@@ -132,6 +132,10 @@ ALTER TABLE ambari.confgroupclusterconfigmapping ADD CONSTRAINT FK_confgroupclus
 ALTER TABLE ambari.configgrouphostmapping ADD CONSTRAINT FK_configgrouphostmapping_configgroup_id FOREIGN KEY (config_group_id) REFERENCES ambari.configgroup (group_id);
 ALTER TABLE ambari.configgrouphostmapping ADD CONSTRAINT FK_configgrouphostmapping_host_name FOREIGN KEY (host_name) REFERENCES ambari.hosts (host_name);
 
+-- required for custom action
+CREATE TABLE ambari.action (action_name VARCHAR(255) NOT NULL, action_type VARCHAR(32) NOT NULL, inputs VARCHAR(1000),
+target_service VARCHAR(255), target_component VARCHAR(255), default_timeout SMALLINT NOT NULL, description VARCHAR(1000), target_type VARCHAR(32), PRIMARY KEY (action_name));
+GRANT ALL PRIVILEGES ON TABLE ambari.action TO :username;
 
 --Move cluster host info for old execution commands to stage table
 UPDATE ambari.stage sd

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index b0bd728..8c7d839 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -28,6 +28,7 @@ import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -57,9 +58,11 @@ public class TestActionDBAccessorImpl {
   private long stageId = 31;
   private String hostName = "host1";
   private String clusterName = "cluster1";
+  private String actionName = "validate_kerberos";
   private Injector injector;
   ActionDBAccessor db;
   ActionManager am;
+  CustomActionDBAccessor cdb;
 
   @Inject
   private Clusters clusters;
@@ -77,9 +80,10 @@ public class TestActionDBAccessorImpl {
     clusters.getHost(hostName).persist();
     clusters.addCluster(clusterName);
     db = injector.getInstance(ActionDBAccessorImpl.class);
-    
+    cdb = injector.getInstance(CustomActionDBAccessor.class);
+
     am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
-        new HostsMap((String) null), null, injector.getInstance(UnitOfWork.class), null);
+        new HostsMap((String) null), null, injector.getInstance(UnitOfWork.class), cdb);
   }
 
   @After
@@ -150,26 +154,26 @@ public class TestActionDBAccessorImpl {
   @Test
   public void testHostRoleScheduled() throws InterruptedException {
     populateActionDB(db, hostName, requestId, stageId);
-    Stage stage = db.getAction(StageUtils.getActionId(requestId, stageId));
+    Stage stage = db.getStage(StageUtils.getActionId(requestId, stageId));
     assertEquals(HostRoleStatus.PENDING, stage.getHostRoleStatus(hostName, Role.HBASE_MASTER.toString()));
     List<HostRoleCommandEntity> entities=
-        hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+        hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER.toString());
 
     assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
     stage.setHostRoleStatus(hostName, Role.HBASE_MASTER.toString(), HostRoleStatus.QUEUED);
 
-    entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+    entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER.toString());
     assertEquals(HostRoleStatus.QUEUED, stage.getHostRoleStatus(hostName, Role.HBASE_MASTER.toString()));
     assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
     db.hostRoleScheduled(stage, hostName, Role.HBASE_MASTER.toString());
 
-    entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+    entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER.toString());
     assertEquals(HostRoleStatus.QUEUED, entities.get(0).getStatus());
 
     Thread thread = new Thread(){
       @Override
       public void run() {
-        Stage stage1 = db.getAction("23-31");
+        Stage stage1 = db.getStage("23-31");
         stage1.setHostRoleStatus(hostName, Role.HBASE_MASTER.toString(), HostRoleStatus.COMPLETED);
         db.hostRoleScheduled(stage1, hostName, Role.HBASE_MASTER.toString());
       }
@@ -178,9 +182,47 @@ public class TestActionDBAccessorImpl {
     thread.start();
     thread.join();
 
-    entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+    entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER.toString());
     assertEquals("Concurrent update failed", HostRoleStatus.COMPLETED, entities.get(0).getStatus());
+  }
+
+  @Test
+  public void testCustomActionScheduled() throws InterruptedException {
+    populateActionDBWithCustomAction(db, hostName, requestId, stageId);
+    Stage stage = db.getStage(StageUtils.getActionId(requestId, stageId));
+    assertEquals(HostRoleStatus.PENDING, stage.getHostRoleStatus(hostName, actionName));
+    List<HostRoleCommandEntity> entities =
+        hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, actionName);
+
+    assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
+    stage.setHostRoleStatus(hostName, actionName, HostRoleStatus.QUEUED);
+
+    entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, actionName);
+    assertEquals(HostRoleStatus.QUEUED, stage.getHostRoleStatus(hostName, actionName));
+    assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
+
+    long now = System.currentTimeMillis();
+    db.hostRoleScheduled(stage, hostName, actionName);
+
+    entities = hostRoleCommandDAO.findByHostRole(
+        hostName, requestId, stageId, actionName);
+    assertEquals(HostRoleStatus.QUEUED, entities.get(0).getStatus());
+
+
+    Thread thread = new Thread() {
+      @Override
+      public void run() {
+        Stage stage1 = db.getStage("23-31");
+        stage1.setHostRoleStatus(hostName, actionName, HostRoleStatus.COMPLETED);
+        db.hostRoleScheduled(stage1, hostName, actionName);
+      }
+    };
+
+    thread.start();
+    thread.join();
 
+    entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, actionName);
+    assertEquals("Concurrent update failed", HostRoleStatus.COMPLETED, entities.get(0).getStatus());
   }
 
   @Test
@@ -199,7 +241,8 @@ public class TestActionDBAccessorImpl {
     commandReport.setExitCode(123);
     db.updateHostRoleState(hostName, requestId, stageId, Role.HBASE_MASTER.toString(), commandReport);
 
-    List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+    List<HostRoleCommandEntity> commandEntities =
+        hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER.toString());
     assertEquals(1, commandEntities.size());
     HostRoleCommandEntity commandEntity = commandEntities.get(0);
     HostRoleCommand command = db.getTask(commandEntity.getTaskId());
@@ -297,4 +340,19 @@ public class TestActionDBAccessorImpl {
     stages.add(s);
     db.persistActions(stages);
   }
+
+  private void populateActionDBWithCustomAction(ActionDBAccessor db, String hostname,
+                                long requestId, long stageId) {
+    Stage s = new Stage(requestId, "/a/b", "cluster1", "action db accessor test", "");
+    s.setStageId(stageId);
+    s.addHostRoleExecutionCommand(hostname, Role.valueOf(actionName),
+        RoleCommand.ACTIONEXECUTE,
+        new ServiceComponentHostStartEvent(Role.HBASE_MASTER.toString(),
+            hostname, System.currentTimeMillis()), "cluster1", "HBASE");
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(s);
+    ExecuteActionRequest request = new ExecuteActionRequest("cluster1", null, actionName, "HBASE",
+        "HBASE_MASTER", null, null);
+    db.persistActions(stages);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
index d465b59..f5ffe8b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
@@ -175,7 +175,7 @@ public class TestActionManager {
     populateActionDB(db, hostname);
     assertEquals(1, clusters.getClusters().size());
 
-    Cluster cluster = clusters.getCluster(clusterName);
+    clusters.getCluster(clusterName);
 
     assertEquals(1, am.getRequests().size());
 

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
index 3e8448c..2af7eef 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
@@ -37,10 +37,12 @@ import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.ServiceNotFoundException;
 import org.apache.ambari.server.StackAccessException;
 import org.apache.ambari.server.actionmanager.ActionDBAccessor;
+import org.apache.ambari.server.actionmanager.ActionType;
 import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.TargetHostType;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
@@ -2052,13 +2054,9 @@ public class AmbariManagementControllerTest {
 
   @Test
   public void testGetServiceComponentHosts() throws AmbariException {
-    clusters.addCluster("c1");
-    Cluster c1 = clusters.getCluster("c1");
-    c1.setDesiredStackVersion(new StackId("HDP-0.1"));
-    clusters.addHost("h1");
-    clusters.getHost("h1").setOsType("centos5");
-    clusters.getHost("h1").persist();
-    clusters.mapHostToCluster("h1", "c1");
+    Cluster c1 = setupClusterWithHosts("c1", "HDP-0.1", new ArrayList<String>() {{
+      add("h1");
+    }}, "centos5");
     Service s1 = serviceFactory.createNew(c1, "HDFS");
     c1.addService(s1);
     s1.persist();
@@ -2105,24 +2103,29 @@ public class AmbariManagementControllerTest {
 
   }
 
+  private Cluster setupClusterWithHosts(String clusterName, String stackId, List<String> hosts,
+                             String osType) throws AmbariException {
+    clusters.addCluster(clusterName);
+    Cluster c1 = clusters.getCluster(clusterName);
+    c1.setDesiredStackVersion(new StackId(stackId));
+    for (String host : hosts) {
+      clusters.addHost(host);
+      clusters.getHost(host).setOsType(osType);
+      clusters.getHost(host).persist();
+      clusters.mapHostToCluster(host, clusterName);
+    }
+    return c1;
+  }
+
   @Test
   public void testGetServiceComponentHostsWithFilters() throws AmbariException {
-    clusters.addCluster("c1");
-    Cluster c1 = clusters.getCluster("c1");
-    c1.setDesiredStackVersion(new StackId("HDP-0.2"));
-
-    clusters.addHost("h1");
-    clusters.addHost("h2");
-    clusters.addHost("h3");
-    clusters.getHost("h1").setOsType("centos5");
-    clusters.getHost("h2").setOsType("centos5");
-    clusters.getHost("h3").setOsType("centos5");
-    clusters.getHost("h1").persist();
-    clusters.getHost("h2").persist();
-    clusters.getHost("h3").persist();
-    clusters.mapHostToCluster("h1", "c1");
-    clusters.mapHostToCluster("h2", "c1");
-    clusters.mapHostToCluster("h3", "c1");
+    Cluster c1 = setupClusterWithHosts("c1", "HDP-0.2",
+        new ArrayList<String>() {{
+          add("h1");
+          add("h2");
+          add("h3");
+        }},
+        "centos5");
 
     Service s1 = serviceFactory.createNew(c1, "HDFS");
     Service s2 = serviceFactory.createNew(c1, "MAPREDUCE");
@@ -2279,25 +2282,21 @@ public class AmbariManagementControllerTest {
 
   @Test
   public void testGetHosts() throws AmbariException {
-    clusters.addCluster("c1");
-    clusters.addCluster("c2");
-    clusters.getCluster("c1").setDesiredStackVersion(new StackId("HDP-0.2"));
-    clusters.getCluster("c2").setDesiredStackVersion(new StackId("HDP-0.2"));
-    clusters.addHost("h1");
-    clusters.addHost("h2");
-    clusters.addHost("h3");
+    setupClusterWithHosts("c1", "HDP-0.2",
+        new ArrayList<String>() {{
+          add("h1");
+          add("h2");
+        }},
+        "centos5");
+
+    setupClusterWithHosts("c2", "HDP-0.2",
+        new ArrayList<String>() {{
+          add("h3");
+        }},
+        "centos5");
     clusters.addHost("h4");
-    clusters.getHost("h1").setOsType("centos5");
-    clusters.getHost("h2").setOsType("centos5");
-    clusters.getHost("h3").setOsType("centos5");
     clusters.getHost("h4").setOsType("centos5");
-    clusters.getHost("h1").persist();
-    clusters.getHost("h2").persist();
-    clusters.getHost("h3").persist();
     clusters.getHost("h4").persist();
-    clusters.mapHostToCluster("h1", "c1");
-    clusters.mapHostToCluster("h2", "c1");
-    clusters.mapHostToCluster("h3", "c2");
 
     Map<String, String> attrs = new HashMap<String, String>();
     attrs.put("a1", "b1");
@@ -3105,6 +3104,7 @@ public class AmbariManagementControllerTest {
     Assert.assertNull(trackAction);
   }
 
+  @Ignore
   @Test
   public void testServiceComponentHostUpdateStackId() throws AmbariException {
     String clusterName = "foo1";
@@ -3264,6 +3264,7 @@ public class AmbariManagementControllerTest {
     }
   }
 
+  @Ignore
   @Test
   public void testServiceComponentHostUpdateStackIdError() throws AmbariException {
     String clusterName = "foo1";
@@ -3442,19 +3443,124 @@ public class AmbariManagementControllerTest {
     // start should fail
   }
 
-  @SuppressWarnings("serial")
   @Test
-  public void testCreateActionsFailures() throws Exception {
-    clusters.addCluster("c1");
-    clusters.getCluster("c1").setDesiredStackVersion(new StackId("HDP-0.1"));
-    clusters.addHost("h1");
-    clusters.getHost("h1").setOsType("centos5");
-    clusters.getHost("h1").persist();
-    Set<String> hostNames = new HashSet<String>(){{
-      add("h1");
+  public void testCreateCustomActions() throws Exception {
+    setupClusterWithHosts("c1", "HDP-2.0.6",
+        new ArrayList<String>() {{
+          add("h1");
+          add("h2");
+          add("h3");
+        }},
+        "centos6");
+
+    Cluster cluster = clusters.getCluster("c1");
+    cluster.setDesiredStackVersion(new StackId("HDP-2.0.6"));
+    cluster.setCurrentStackVersion(new StackId("HDP-2.0.6"));
+
+    ConfigFactory cf = injector.getInstance(ConfigFactory.class);
+    Config config1 = cf.createNew(cluster, "global",
+        new HashMap<String, String>() {{
+          put("key1", "value1");
+        }});
+    config1.setVersionTag("version1");
+
+    Config config2 = cf.createNew(cluster, "core-site",
+        new HashMap<String, String>() {{
+          put("key1", "value1");
+        }});
+    config2.setVersionTag("version1");
+
+    cluster.addConfig(config1);
+    cluster.addConfig(config2);
+
+    Service hdfs = cluster.addService("HDFS");
+    hdfs.persist();
+
+    Service mapred = cluster.addService("YARN");
+    mapred.persist();
+
+    hdfs.addServiceComponent(Role.HDFS_CLIENT.name()).persist();
+    hdfs.addServiceComponent(Role.NAMENODE.name()).persist();
+    hdfs.addServiceComponent(Role.DATANODE.name()).persist();
+
+    mapred.addServiceComponent(Role.RESOURCEMANAGER.name()).persist();
+
+    hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).addServiceComponentHost("h1").persist();
+    hdfs.getServiceComponent(Role.NAMENODE.name()).addServiceComponentHost("h1").persist();
+    hdfs.getServiceComponent(Role.DATANODE.name()).addServiceComponentHost("h1").persist();
+    hdfs.getServiceComponent(Role.DATANODE.name()).addServiceComponentHost("h2").persist();
+
+    controller.getActionManager().createActionDefinition(
+        "a1", ActionType.SYSTEM, "test", "Does file exist", "", "",
+        TargetHostType.SPECIFIC, Short.valueOf("100"));
+
+    controller.getActionManager().createActionDefinition(
+        "a2", ActionType.SYSTEM, "", "Does file exist", "HDFS", "DATANODE",
+        TargetHostType.ALL, Short.valueOf("100"));
+
+    Map<String, String> params = new HashMap<String, String>() {{
+      put("test", "test");
     }};
 
-    clusters.mapHostsToCluster(hostNames, "c1");
+    Map<String, String> requestProperties = new HashMap<String, String>();
+    requestProperties.put(REQUEST_CONTEXT_PROPERTY, "Called from a test");
+    long now = System.currentTimeMillis();
+
+    ArrayList<String> hosts = new ArrayList<String>() {{add("h1");}};
+
+    ExecuteActionRequest actionRequest = new ExecuteActionRequest("c1", null, "a1", "HDFS", "DATANODE", hosts, params);
+    RequestStatusResponse response = controller.createAction(actionRequest, requestProperties);
+    assertEquals(1, response.getTasks().size());
+    ShortTaskStatus taskStatus = response.getTasks().get(0);
+    Assert.assertEquals("h1", taskStatus.getHostName());
+
+    List<HostRoleCommand> storedTasks = actionDB.getRequestTasks(response.getRequestId());
+    Stage stage = actionDB.getAllStages(response.getRequestId()).get(0);
+    Assert.assertNotNull(stage);
+    Assert.assertEquals(1, storedTasks.size());
+    HostRoleCommand task = storedTasks.get(0);
+    Assert.assertEquals(RoleCommand.ACTIONEXECUTE, task.getRoleCommand());
+    Assert.assertEquals("a1", task.getRole().name());
+    Assert.assertEquals("h1", task.getHostName());
+    ExecutionCommand cmd = task.getExecutionCommandWrapper().getExecutionCommand();
+    Assert.assertTrue(cmd.getCommandParams().containsKey("test"));
+    Assert.assertEquals(cmd.getServiceName(), "HDFS");
+    Assert.assertEquals(cmd.getComponentName(), "DATANODE");
+
+    actionRequest = new ExecuteActionRequest("c1", null, "a2", "", "", null, params);
+    response = controller.createAction(actionRequest, requestProperties);
+    assertEquals(2, response.getTasks().size());
+
+    final List<HostRoleCommand> storedTasks2 = actionDB.getRequestTasks(response.getRequestId());
+    task = storedTasks2.get(1);
+    Assert.assertEquals(RoleCommand.ACTIONEXECUTE, task.getRoleCommand());
+    Assert.assertEquals("a2", task.getRole().name());
+    HashSet<String> expectedHosts = new HashSet<String>(){{add("h2"); add("h1");}};
+    HashSet<String> actualHosts = new HashSet<String>(){{add(storedTasks2.get(1).getHostName()); add(storedTasks2
+        .get(0).getHostName());}};
+    Assert.assertEquals(expectedHosts, actualHosts);
+
+    cmd = task.getExecutionCommandWrapper().getExecutionCommand();
+    Assert.assertTrue(cmd.getCommandParams().containsKey("test"));
+    Assert.assertEquals(cmd.getServiceName(), "HDFS");
+    Assert.assertEquals(cmd.getComponentName(), "DATANODE");
+
+    hosts = new ArrayList<String>() {{add("h3");}};
+    actionRequest = new ExecuteActionRequest("c1", null, "a1", "", "", hosts, params);
+    response = controller.createAction(actionRequest, requestProperties);
+    assertEquals(1, response.getTasks().size());
+    taskStatus = response.getTasks().get(0);
+    Assert.assertEquals("h3", taskStatus.getHostName());
+  }
+
+  @SuppressWarnings("serial")
+  @Test
+  public void testCreateActionsFailures() throws Exception {
+    setupClusterWithHosts("c1", "HDP-0.1",
+        new ArrayList<String>() {{
+          add("h1");
+        }},
+        "centos5");
 
     Cluster cluster = clusters.getCluster("c1");
     cluster.setDesiredStackVersion(new StackId("HDP-0.1"));
@@ -3462,11 +3568,15 @@ public class AmbariManagementControllerTest {
 
     ConfigFactory cf = injector.getInstance(ConfigFactory.class);
     Config config1 = cf.createNew(cluster, "global",
-        new HashMap<String, String>(){{ put("key1", "value1"); }});
+        new HashMap<String, String>() {{
+          put("key1", "value1");
+        }});
     config1.setVersionTag("version1");
 
     Config config2 = cf.createNew(cluster, "core-site",
-        new HashMap<String, String>(){{ put("key1", "value1"); }});
+        new HashMap<String, String>() {{
+          put("key1", "value1");
+        }});
     config2.setVersionTag("version1");
 
     cluster.addConfig(config1);
@@ -3477,10 +3587,15 @@ public class AmbariManagementControllerTest {
     Service hdfs = cluster.addService("HDFS");
     hdfs.persist();
 
+    Service mapred = cluster.addService("MAPREDUCE");
+    mapred.persist();
+
     hdfs.addServiceComponent(Role.HDFS_CLIENT.name()).persist();
     hdfs.addServiceComponent(Role.NAMENODE.name()).persist();
     hdfs.addServiceComponent(Role.DATANODE.name()).persist();
 
+    mapred.addServiceComponent(Role.MAPREDUCE_CLIENT.name()).persist();
+
     hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).addServiceComponentHost("h1").persist();
     hdfs.getServiceComponent(Role.NAMENODE.name()).addServiceComponentHost("h1").persist();
     hdfs.getServiceComponent(Role.DATANODE.name()).addServiceComponentHost("h1").persist();
@@ -3493,74 +3608,114 @@ public class AmbariManagementControllerTest {
     Map<String, String> requestProperties = new HashMap<String, String>();
     requestProperties.put(REQUEST_CONTEXT_PROPERTY, "Called from a test");
 
-    try {
-      controller.createAction(actionRequest, requestProperties);
-      Assert.fail("createAction should fail for NON_EXISTENT_CHECK");
-    } catch (AmbariException ex) {
-      Assert.assertTrue(ex.getMessage().contains("Unsupported action"));
-    }
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties, "Unsupported action");
 
     actionRequest = new ExecuteActionRequest("c1", "NON_EXISTENT_SERVICE_CHECK", "HDFS", params);
-    try {
-      controller.createAction(actionRequest, requestProperties);
-      Assert.fail("createAction should fail for NON_EXISTENT_SERVICE_CHECK");
-    } catch (AmbariException ex) {
-      Assert.assertTrue(ex.getMessage().contains("Unsupported action"));
-    }
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Unsupported action");
 
     actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION_DATANODE", "MAPREDUCE", params);
-    try {
-      controller.createAction(actionRequest, requestProperties);
-      Assert.fail("createAction should fail for DECOMMISSION_DATANODE on MAPREDUCE");
-    } catch (AmbariException ex) {
-      Assert.assertTrue(ex.getMessage().contains("Unsupported action DECOMMISSION_DATANODE for MAPREDUCE"));
-    }
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Unsupported action DECOMMISSION_DATANODE for MAPREDUCE");
 
     actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION_DATANODE", "HDFS", params);
-    try {
-      controller.createAction(actionRequest, requestProperties);
-      Assert.fail("createAction should fail for DECOMMISSION_DATANODE on HDFS - no excludeFileTag");
-    } catch (IllegalArgumentException ex) {
-      Assert.assertTrue(ex.getMessage().contains("No exclude file specified when decommissioning datanodes"));
-    }
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "No exclude file specified when decommissioning datanodes");
 
     params.put("excludeFileTag", "tag1");
     actionRequest = new ExecuteActionRequest("c1", "DECOMMISSION_DATANODE", "HDFS", params);
-    try {
-      controller.createAction(actionRequest, requestProperties);
-      Assert.fail("createAction should fail for DECOMMISSION_DATANODE on HDFS - no config type hdfs-exclude-file");
-    } catch (AmbariException ex) {
-      Assert.assertTrue(ex.getMessage().contains("Decommissioning datanodes requires the cluster"));
-    }
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Decommissioning datanodes requires the cluster");
 
     actionRequest = new ExecuteActionRequest("c1", null, "DECOMMISSION_DATANODE", "HDFS", null, null, params);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Action DECOMMISSION_DATANODE does not exist");
+
+    controller.getActionManager().createActionDefinition(
+        "a1", ActionType.SYSTEM, "test,dirName", "Does file exist", "", "",
+        TargetHostType.SPECIFIC, Short.valueOf("100"));
+
+    controller.getActionManager().createActionDefinition(
+        "a2", ActionType.SYSTEM, "", "Does file exist", "HDFS", "DATANODE",
+        TargetHostType.ANY, Short.valueOf("100"));
+
+    controller.getActionManager().createActionDefinition(
+        "a3", ActionType.SYSTEM, "", "Does file exist", "YARN", "NODEMANAGER",
+        TargetHostType.ANY, Short.valueOf("100"));
+
+    controller.getActionManager().createActionDefinition(
+        "a4", ActionType.SYSTEM, "", "Does file exist", "MAPREDUCE", "",
+        TargetHostType.ANY, Short.valueOf("100"));
+
+    actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, null, null);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Action a1 requires input 'test' that is not provided");
+
+    actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, null, params);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Action a1 requires input 'dirName' that is not provided");
+
+    params.put("dirName", "dirName");
+    actionRequest = new ExecuteActionRequest("c1", null, "a1", null, null, null, params);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Action a1 requires explicit target host(s)");
+
+    actionRequest = new ExecuteActionRequest("c1", null, "a2", "MAPREDUCE", null, null, params);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Action a2 targets service MAPREDUCE that does not match with expected HDFS");
+
+    actionRequest = new ExecuteActionRequest("c1", null, "a2", "HDFS", "HDFS_CLIENT", null, params);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Action a2 targets component HDFS_CLIENT that does not match with expected DATANODE");
+
+    actionRequest = new ExecuteActionRequest("c1", null, "a1", "HDFS2", "HDFS_CLIENT", null, params);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Action a1 targets service HDFS2 that does not exist");
+
+    actionRequest = new ExecuteActionRequest("c1", null, "a1", "HDFS", "HDFS_CLIENT2", null, params);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Action a1 targets component HDFS_CLIENT2 that does not exist");
+
+    actionRequest = new ExecuteActionRequest("c1", null, "a1", "", "HDFS_CLIENT2", null, params);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Action a1 targets component HDFS_CLIENT2 without specifying the target service");
+
+    actionRequest = new ExecuteActionRequest("c1", null, "a3", "", "", null, params);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Action a3 targets service YARN that does not exist");
+
+    List<String> hosts = new ArrayList<String>();
+    hosts.add("h6");
+    actionRequest = new ExecuteActionRequest("c1", null, "a2", "", "", hosts, params);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Request specifies host h6 but its not a valid host based on the target service=HDFS and component=DATANODE");
+
+    actionRequest = new ExecuteActionRequest("c1", null, "a4", "MAPREDUCE", "", null, params);
+    expectActionCreationErrorWithMessage(actionRequest, requestProperties,
+        "Suitable hosts not found, component=, service=MAPREDUCE, cluster=c1, actionName=a4");
+
+  }
+
+  private void expectActionCreationErrorWithMessage(ExecuteActionRequest actionRequest,
+                                                    Map<String, String> requestProperties, String message) {
     try {
       RequestStatusResponse response = controller.createAction(actionRequest, requestProperties);
-      if (response != null) {
-        Assert.fail("createAction should fail for action DECOMMISSION_DATANODE");
-      }
+      Assert.fail("createAction should fail");
     } catch (AmbariException ex) {
-      Assert.assertTrue(ex.getMessage().contains("Invalid action request"));
+      LOG.info(ex.getMessage());
+      Assert.assertTrue(ex.getMessage().contains(message));
     }
   }
 
   @SuppressWarnings("serial")
   @Test
-  public void testCreateActions() throws Exception {
-    clusters.addCluster("c1");
-    clusters.getCluster("c1").setDesiredStackVersion(new StackId("HDP-0.1"));
-    clusters.addHost("h1");
-    clusters.getHost("h1").setOsType("centos5");
-    clusters.getHost("h1").persist();
-    clusters.addHost("h2");
-    clusters.getHost("h2").setOsType("centos5");
-    clusters.getHost("h2").persist();
-    Set<String> hostNames = new HashSet<String>(){{
-      add("h1");
-      add("h2");
-    }};
-
-    clusters.mapHostsToCluster(hostNames, "c1");
+  public void testCreateServiceCheckActions() throws Exception {
+    setupClusterWithHosts("c1", "HDP-0.1",
+        new ArrayList<String>() {{
+          add("h1");
+          add("h2");
+        }},
+        "centos5");
 
     Cluster cluster = clusters.getCluster("c1");
     cluster.setDesiredStackVersion(new StackId("HDP-0.1"));
@@ -8166,6 +8321,16 @@ public class AmbariManagementControllerTest {
     }
 
     createRequest =
+        new ActionRequest("a1", "SYSTEM", "", "", "", "SS", "ANY", "10");
+    try {
+      ActionResourceProviderTest.createAction(controller, createRequest);
+      Assert.fail("Exception must be thrown");
+    } catch (Exception ex) {
+      LOG.info(ex.getMessage());
+      Assert.assertTrue(ex.getMessage().contains("Default timeout should be between 60 and 600"));
+    }
+
+    createRequest =
         new ActionRequest("a1", "SYSTEM", "", "HDFS", "", "SS", "ANY", "100");
     try {
       ActionResourceProviderTest.createAction(controller, createRequest);
@@ -8185,6 +8350,17 @@ public class AmbariManagementControllerTest {
       LOG.info(ex.getMessage());
       Assert.assertTrue(ex.getMessage().contains("No enum const class"));
     }
+
+    createRequest =
+        new ActionRequest("a1", "SYSTEM", "", "", "DATANODE", "SS", "SPECIFIC", "100");
+    try {
+      ActionResourceProviderTest.createAction(controller, createRequest);
+      Assert.fail("Exception must be thrown");
+    } catch (Exception ex) {
+      LOG.info(ex.getMessage());
+      Assert.assertTrue(ex.getMessage().contains("Target component cannot be specified unless target service is " +
+          "specified"));
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java
index 586478d..96daeea 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java
@@ -187,7 +187,9 @@ public class TestOrmImpl extends Assert {
   public void testAbortHostRoleCommands() {
     injector.getInstance(OrmTestHelper.class).createStageCommands();
     HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
-    int result = hostRoleCommandDAO.updateStatusByRequestId(0L, HostRoleStatus.ABORTED, Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS, HostRoleStatus.PENDING));
+    int result = hostRoleCommandDAO.updateStatusByRequestId(
+        0L, HostRoleStatus.ABORTED, Arrays.asList(HostRoleStatus.QUEUED,
+        HostRoleStatus.IN_PROGRESS, HostRoleStatus.PENDING));
     //result always 1 in batch mode
     List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByRequest(0L);
     int count = 0;
@@ -203,7 +205,7 @@ public class TestOrmImpl extends Assert {
   public void testFindStageByHostRole() {
     injector.getInstance(OrmTestHelper.class).createStageCommands();
     HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
-    List<HostRoleCommandEntity> list = hostRoleCommandDAO.findByHostRole("test_host1", 0L, 0L, Role.DATANODE);
+    List<HostRoleCommandEntity> list = hostRoleCommandDAO.findByHostRole("test_host1", 0L, 0L, Role.DATANODE.toString());
     assertEquals(1, list.size());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java
index 516069a..fbb382f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java
@@ -211,7 +211,8 @@ public class TestStageUtils {
     addHbaseService(fsm.getCluster("c1"), hostList, injector);
     addMapreduceService(fsm.getCluster("c1"), hostList, injector);
     Map<String, List<String>> info = StageUtils.getClusterHostInfo(fsm.getHostsForCluster("c1"),
-        fsm.getCluster("c1"), new HostsMap(injector.getInstance(Configuration.class)), injector);
+        fsm.getCluster("c1"), new HostsMap(injector.getInstance(Configuration.class)),
+        injector.getInstance(Configuration.class));
     assertEquals(2, info.get("slave_hosts").size());
     assertEquals(2, info.get("mapred_tt_hosts").size());
     assertEquals(2, info.get("hbase_rs_hosts").size());


[2/2] git commit: AMBARI 3731. Custom Action: Add support for custom action execution

Posted by sm...@apache.org.
AMBARI 3731. Custom Action: Add support for custom action execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ambari/commit/22f5fdfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ambari/tree/22f5fdfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ambari/diff/22f5fdfb

Branch: refs/heads/trunk
Commit: 22f5fdfb70916fb5ffe486c6bcb50f36bc0de1b4
Parents: 708e59d
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Wed Nov 13 22:30:26 2013 -0800
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Wed Nov 13 22:30:26 2013 -0800

----------------------------------------------------------------------
 .../org/apache/ambari/server/RoleCommand.java   |   3 +-
 .../server/actionmanager/ActionDBAccessor.java  |  73 +++-
 .../actionmanager/ActionDBAccessorImpl.java     |  61 ++-
 .../actionmanager/ActionDBInMemoryImpl.java     |  49 +--
 .../server/actionmanager/ActionManager.java     |   9 +-
 .../server/actionmanager/ActionScheduler.java   |   6 +-
 .../CustomActionDBAccessorImpl.java             |  12 +-
 .../server/actionmanager/HostRoleCommand.java   |   9 -
 .../ambari/server/agent/ExecutionCommand.java   |  13 +-
 .../ambari/server/agent/HeartBeatHandler.java   |  13 +-
 .../controller/ActionExecutionContext.java      |  91 +++++
 .../controller/AmbariActionExecutionHelper.java | 290 ++++++++++++++
 .../AmbariCustomCommandExecutionHelper.java     | 239 ++++++++++++
 .../AmbariManagementControllerImpl.java         | 278 +++-----------
 .../server/controller/ExecuteActionRequest.java |  38 +-
 .../server/orm/dao/HostRoleCommandDAO.java      |   4 +-
 .../apache/ambari/server/orm/dao/StageDAO.java  |   4 -
 .../server/orm/entities/ActionEntity.java       |  14 +-
 .../svccomphost/ServiceComponentHostImpl.java   |   2 -
 .../apache/ambari/server/utils/StageUtils.java  |   5 +-
 .../ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql   |   4 +
 .../actionmanager/TestActionDBAccessorImpl.java |  76 +++-
 .../server/actionmanager/TestActionManager.java |   2 +-
 .../AmbariManagementControllerTest.java         | 376 ++++++++++++++-----
 .../apache/ambari/server/orm/TestOrmImpl.java   |   6 +-
 .../ambari/server/utils/TestStageUtils.java     |   3 +-
 26 files changed, 1214 insertions(+), 466 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
index 33370bf..ad006ec 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
@@ -24,5 +24,6 @@ public enum RoleCommand {
   STOP,
   EXECUTE,
   ABORT,
-  UPGRADE
+  UPGRADE,
+  ACTIONEXECUTE
 }

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 2c79edf..11605bb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -17,23 +17,35 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
+
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.ambari.server.Role;
-import org.apache.ambari.server.agent.CommandReport;
-
 public interface ActionDBAccessor {
 
-  public Stage getAction(String actionId);
+  /**
+   * Given an action id of the form requestId-stageId, retrieve the Stage
+   */
+  public Stage getStage(String actionId);
 
+  /**
+   * Get all stages associated with a single request id
+   */
   public List<Stage> getAllStages(long requestId);
 
+  /**
+   * Abort all outstanding operations associated with the given request
+   */
   public void abortOperation(long requestId);
 
-  public void timeoutHostRole(String host, long requestId, long stageId, Role role);
+  /**
+   * Mark the task as to have timed out
+   */
+  public void timeoutHostRole(String host, long requestId, long stageId, String role);
 
   /**
    * Returns all the pending stages, including queued and not-queued.
@@ -41,47 +53,86 @@ public interface ActionDBAccessor {
    */
   public List<Stage> getStagesInProgress();
 
+  /**
+   * Persists all tasks for a given request
+   *
+   * @param stages  Stages belonging to the request
+   */
   public void persistActions(List<Stage> stages);
 
+  /**
+   * For the given host, update all the tasks based on the command report
+   */
   public void updateHostRoleState(String hostname, long requestId,
-      long stageId, String role, CommandReport report);
+                                  long stageId, String role, CommandReport report);
 
-  public void abortHostRole(String host, long requestId, long stageId,
-      Role role);
+  /**
+   * Mark the task as to have been aborted
+   */
+  public void abortHostRole(String host, long requestId, long stageId, String role);
 
   /**
    * Return the last persisted Request ID as seen when the DBAccessor object
    * was initialized.
    * Value should remain unchanged through the lifetime of the object instance.
+   *
    * @return Request Id seen at init time
    */
   public long getLastPersistedRequestIdWhenInitialized();
 
   /**
    * Updates scheduled stage.
-   * @param s
-   * @param hostname
-   * @param roleStr
    */
   public void hostRoleScheduled(Stage s, String hostname, String roleStr);
 
+  /**
+   * Given a request id, get all the tasks that belong to this request
+   */
   public List<HostRoleCommand> getRequestTasks(long requestId);
 
+  /**
+   * Given a list of request ids, get all the tasks that belong to these requests
+   */
   public List<HostRoleCommand> getAllTasksByRequestIds(Collection<Long> requestIds);
 
+  /**
+   * Get a list of host role commands where the request id belongs to the input requestIds and
+   * the task id belongs to the input taskIds
+   */
   public List<HostRoleCommand> getTasksByRequestAndTaskIds(Collection<Long> requestIds, Collection<Long> taskIds);
 
+  /**
+   * Given a list of task ids, get all the host role commands
+   */
   public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds);
 
+  /**
+   * Get all stages that contain tasks with specified host role statuses
+   */
   public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses);
 
+  /**
+   * Get all requests
+   */
   public List<Long> getRequests();
 
+  /**
+   * Gets the host role command corresponding to the task id
+   */
   public HostRoleCommand getTask(long taskId);
 
+  /**
+   * Gets request id of request that are in the specified status
+   */
   public List<Long> getRequestsByStatus(RequestStatus status);
 
+  /**
+   * Gets request contexts associated with the list of request id
+   */
   public Map<Long, String> getRequestContext(List<Long> requestIds);
 
+  /**
+   * Gets the request context associated with the request id
+   */
   public String getRequestContext(long requestId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 75ebfef..d0cba5e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -17,16 +17,17 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
 import com.google.inject.name.Named;
+import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.Role;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
+import org.apache.ambari.server.orm.dao.ActionDefinitionDAO;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
@@ -44,15 +45,20 @@ import org.apache.ambari.server.state.Clusters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-import com.google.inject.persist.Transactional;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 @Singleton
 public class ActionDBAccessorImpl implements ActionDBAccessor {
   private static final Logger LOG = LoggerFactory.getLogger(ActionDBAccessorImpl.class);
-
+  private final long requestId;
   @Inject
   private ClusterDAO clusterDAO;
   @Inject
@@ -71,12 +77,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   private HostRoleCommandFactory hostRoleCommandFactory;
   @Inject
   private Clusters clusters;
-
   private Cache<Long, HostRoleCommand> hostRoleCommandCache;
   private long cacheLimit; //may be exceeded to store tasks from one request
 
-  private final long requestId;
-
   @Inject
   public ActionDBAccessorImpl(Injector injector, @Named("executionCommandCacheSize") long cacheLimit) {
     injector.injectMembers(this);
@@ -90,10 +93,10 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   }
 
   /* (non-Javadoc)
-   * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getAction(java.lang.String)
+   * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getStage(java.lang.String)
    */
   @Override
-  public Stage getAction(String actionId) {
+  public Stage getStage(String actionId) {
     return stageFactory.createExisting(actionId);
   }
 
@@ -137,7 +140,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   @Override
   @Transactional
   public void timeoutHostRole(String host, long requestId, long stageId,
-                              Role role) {
+                              String role) {
     List<HostRoleCommandEntity> commands =
         hostRoleCommandDAO.findByHostRole(host, requestId, stageId, role);
     for (HostRoleCommandEntity command : commands) {
@@ -170,6 +173,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Adding stages to DB, stageCount=" + stages.size());
     }
+
     for (Stage stage : stages) {
       StageEntity stageEntity = stage.constructNewPersistenceEntity();
       Cluster cluster;
@@ -213,7 +217,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
       for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
         roleSuccessCriteriaDAO.create(roleSuccessCriteriaEntity);
       }
-
     }
   }
 
@@ -227,7 +230,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
           + stageId + " role " + role + " report " + report);
     }
     List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(
-        hostname, requestId, stageId, Role.valueOf(role));
+        hostname, requestId, stageId, role);
     for (HostRoleCommandEntity command : commands) {
       command.setStatus(HostRoleStatus.valueOf(report.getStatus()));
       command.setStdOut(report.getStdOut().getBytes());
@@ -238,13 +241,13 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   }
 
   @Override
-  public void abortHostRole(String host, long requestId, long stageId, Role role) {
+  public void abortHostRole(String host, long requestId, long stageId, String role) {
     CommandReport report = new CommandReport();
     report.setExitCode(999);
     report.setStdErr("Host Role in invalid state");
     report.setStdOut("");
     report.setStatus("ABORTED");
-    updateHostRoleState(host, requestId, stageId, role.toString(), report);
+    updateHostRoleState(host, requestId, stageId, role, report);
   }
 
   @Override
@@ -266,7 +269,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     } else {
       throw new RuntimeException("HostRoleCommand is not persisted, cannot update:\n" + hostRoleCommand);
     }
-
   }
 
   @Override
@@ -275,11 +277,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     return getTasks(
         hostRoleCommandDAO.findTaskIdsByRequest(requestId)
     );
-
-//    for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequest(requestId)) {
-//      tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-//    }
-//    return tasks;
   }
 
   @Override
@@ -291,12 +288,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     return getTasks(
         hostRoleCommandDAO.findTaskIdsByRequestIds(requestIds)
     );
-
-//    List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
-//    for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestIds(requestIds)) {
-//      tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-//    }
-//    return tasks;
   }
 
   @Override
@@ -304,11 +295,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     if (!requestIds.isEmpty() && !taskIds.isEmpty()) {
       return getTasks(hostRoleCommandDAO.findTaskIdsByRequestAndTaskIds(requestIds, taskIds));
 
-//      List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
-//      for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestAndTaskIds(requestIds, taskIds)) {
-//        tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-//      }
-//      return tasks;
     } else if (requestIds.isEmpty()) {
       return getTasks(taskIds);
     } else if (taskIds.isEmpty()) {
@@ -335,7 +321,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
 
     if (!absent.isEmpty()) {
       boolean allowStore = hostRoleCommandCache.size() <= cacheLimit;
-//      LOG.info("Cache size {}, enable = {}", hostRoleCommandCache.size(), allowStore);
 
       for (HostRoleCommandEntity commandEntity : hostRoleCommandDAO.findByPKs(absent)) {
         HostRoleCommand hostRoleCommand = hostRoleCommandFactory.createExisting(commandEntity);

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
index 1fccf12..8c36366 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
@@ -17,38 +17,42 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import java.util.*;
-
-import org.apache.ambari.server.Role;
+import com.google.inject.Singleton;
 import org.apache.ambari.server.agent.CommandReport;
-import org.apache.ambari.server.agent.ExecutionCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.inject.Singleton;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 @Singleton
 public class ActionDBInMemoryImpl implements ActionDBAccessor {
 
+  private static Logger LOG = LoggerFactory.getLogger(ActionDBInMemoryImpl.class);
   // for a persisted DB, this will be initialized in the ctor
   // with the highest persisted requestId value in the DB
   private final long lastRequestId = 0;
-  private static Logger LOG = LoggerFactory.getLogger(ActionDBInMemoryImpl.class);
   List<Stage> stageList = new ArrayList<Stage>();
 
   @Override
-  public synchronized Stage getAction(String actionId) {
-    for (Stage s: stageList) {
+  public synchronized Stage getStage(String actionId) {
+    for (Stage s : stageList) {
       if (s.getActionId().equals(actionId)) {
         return s;
       }
     }
     return null;
   }
+
   @Override
   public synchronized List<Stage> getAllStages(long requestId) {
     List<Stage> l = new ArrayList<Stage>();
-    for (Stage s: stageList) {
+    for (Stage s : stageList) {
       if (s.getRequestId() == requestId) {
         l.add(s);
       }
@@ -80,7 +84,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
 
   @Override
   public synchronized void timeoutHostRole(String host, long requestId,
-      long stageId, Role role) {
+                                           long stageId, String role) {
     for (Stage s : stageList) {
       s.setHostRoleStatus(host, role.toString(), HostRoleStatus.TIMEDOUT);
     }
@@ -89,7 +93,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
   @Override
   public synchronized List<Stage> getStagesInProgress() {
     List<Stage> l = new ArrayList<Stage>();
-    for (Stage s: stageList) {
+    for (Stage s : stageList) {
       if (s.isStageInProgress()) {
         l.add(s);
       }
@@ -99,15 +103,16 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
 
   @Override
   public synchronized void persistActions(List<Stage> stages) {
-    for (Stage s: stages) {
+    for (Stage s : stages) {
       stageList.add(s);
     }
   }
+
   @Override
   public synchronized void updateHostRoleState(String hostname, long requestId,
-      long stageId, String role, CommandReport report) {
-    LOG.info("DEBUG stages to iterate: "+stageList.size());
-    if(null == report.getStatus()
+                                               long stageId, String role, CommandReport report) {
+    LOG.info("DEBUG stages to iterate: " + stageList.size());
+    if (null == report.getStatus()
         || null == report.getStdOut()
         || null == report.getStdErr()) {
       throw new RuntimeException("Badly formed command report.");
@@ -124,13 +129,13 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
   }
 
   @Override
-  public void abortHostRole(String host, long requestId, long stageId, Role role) {
+  public void abortHostRole(String host, long requestId, long stageId, String role) {
     CommandReport report = new CommandReport();
     report.setExitCode(999);
     report.setStdErr("Host Role in invalid state");
     report.setStdOut("");
     report.setStatus("ABORTED");
-    updateHostRoleState(host, requestId, stageId, role.toString(), report);
+    updateHostRoleState(host, requestId, stageId, role, report);
   }
 
   @Override
@@ -168,17 +173,18 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
   @Override
   public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) {
     List<Stage> l = new ArrayList<Stage>();
-    for (Stage s: stageList) {
+    for (Stage s : stageList) {
       if (s.doesStageHaveHostRoleStatus(statuses)) {
         l.add(s);
       }
     }
     return l;
   }
+
   @Override
   public synchronized List<Long> getRequests() {
     Set<Long> requestIds = new HashSet<Long>();
-    for (Stage s: stageList) {
+    for (Stage s : stageList) {
       requestIds.add(s.getRequestId());
     }
     List<Long> ids = new ArrayList<Long>();
@@ -199,6 +205,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
     }
     return null;
   }
+
   @Override
   public List<Long> getRequestsByStatus(RequestStatus status) {
     // TODO
@@ -211,7 +218,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
     for (Long requestId : requestIds) {
       List<Stage> stages = getAllStages(requestId);
       result.put(requestId, stages != null && !stages.isEmpty() ? stages.get
-        (0).getRequestContext() : "");
+          (0).getRequestContext() : "");
     }
     return result;
   }
@@ -220,6 +227,6 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
   public String getRequestContext(long requestId) {
     List<Stage> stages = getAllStages(requestId);
     return stages != null && !stages.isEmpty() ? stages.get(0)
-      .getRequestContext() : "";
+        .getRequestContext() : "";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index 6b32e73..aa553c4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -24,6 +24,7 @@ import com.google.inject.persist.UnitOfWork;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.serveraction.ServerActionManager;
 import org.apache.ambari.server.state.Clusters;
@@ -73,12 +74,16 @@ public class ActionManager {
     scheduler.stop();
   }
 
-  public void sendActions(List<Stage> stages) {
+  public void sendActions(List<Stage> stages, ExecuteActionRequest request) {
 
     if (LOG.isDebugEnabled()) {
       for (Stage s : stages) {
         LOG.debug("Persisting stage into db: " + s.toString());
       }
+
+      if (request != null) {
+        LOG.debug("In response to request: " + request.toString());
+      }
     }
     db.persistActions(stages);
 
@@ -91,7 +96,7 @@ public class ActionManager {
   }
 
   public Stage getAction(long requestId, long stageId) {
-    return db.getAction(StageUtils.getActionId(requestId, stageId));
+    return db.getStage(StageUtils.getActionId(requestId, stageId));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 22ed44e..a5c5dc0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -205,8 +205,7 @@ class ActionScheduler implements Runnable {
               scheduleHostRole(s, cmd);
             } catch (InvalidStateTransitionException e) {
               LOG.warn("Could not schedule host role " + cmd.toString(), e);
-              db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(),
-                  Role.valueOf(cmd.getRole()));
+              db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(), cmd.getRole());
             }
           }
         }
@@ -326,8 +325,7 @@ class ActionScheduler implements Runnable {
           if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
             LOG.warn("Host:" + host + ", role:" + roleStr + ", actionId:"
                 + s.getActionId() + " expired");
-            db.timeoutHostRole(host, s.getRequestId(), s.getStageId(),
-                Role.valueOf(c.getRole()));
+            db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), c.getRole());
             //Reinitialize status
             status = s.getHostRoleStatus(host, roleStr);
             ServiceComponentHostOpFailedEvent timeoutEvent =

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
index eb0cfa9..38bc371 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
@@ -98,7 +98,8 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
                                      TargetHostType targetType, String serviceType, String componentType,
                                      Short defaultTimeout)
       throws AmbariException {
-    validateCreateInput(actionName, actionType, inputs, description, defaultTimeout);
+    validateCreateInput(actionName, actionType, inputs, description, defaultTimeout,
+        targetType, serviceType, componentType);
     ActionEntity entity =
         actionDefinitionDAO.findByPK(actionName);
     if (entity == null) {
@@ -178,7 +179,8 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
   }
 
   private void validateCreateInput(String actionName, ActionType actionType, String inputs,
-                                   String description, Short defaultTimeout)
+                                   String description, Short defaultTimeout,
+                                   TargetHostType targetType, String serviceType, String componentType)
       throws AmbariException {
 
     validateActionName(actionName);
@@ -199,6 +201,12 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
       throw new AmbariException("Action type cannot be " + actionType);
     }
 
+    if (serviceType == null || serviceType.isEmpty()) {
+      if (componentType != null && !componentType.isEmpty()) {
+        throw new AmbariException("Target component cannot be specified unless target service is specified");
+      }
+    }
+
     if (inputs != null && !inputs.isEmpty()) {
       String[] parameters = inputs.split(",");
       for (String parameter : parameters) {

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
index e3bed0c..21ec077 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
@@ -86,10 +86,6 @@ public class HostRoleCommand {
     //make use of lazy loading
 
     executionCommandDAO = injector.getInstance(ExecutionCommandDAO.class);
-//    executionCommandWrapper = new ExecutionCommandWrapper(new String(
-//        hostRoleCommandEntity
-//            .getExecutionCommand().getCommand()
-//    ));
   }
 
   HostRoleCommandEntity constructNewPersistenceEntity() {
@@ -106,10 +102,6 @@ public class HostRoleCommand {
     hostRoleCommandEntity.setRoleCommand(roleCommand);
 
     hostRoleCommandEntity.setEvent(event.getEventJson());
-//    ExecutionCommandEntity executionCommandEntity = new ExecutionCommandEntity();
-//    executionCommandEntity.setCommand(executionCommandWrapper.getJson().getBytes());
-//    executionCommandEntity.setHostRoleCommand(hostRoleCommandEntity);
-//    hostRoleCommandEntity.setExecutionCommand(executionCommandEntity);
 
     return hostRoleCommandEntity;
   }
@@ -120,7 +112,6 @@ public class HostRoleCommand {
     return executionCommandEntity;
   }
 
-
   public long getTaskId() {
     return taskId;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
index 1b4ecdf..c72c14b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
@@ -54,7 +54,8 @@ public class ExecutionCommand extends AgentCommand {
   private Map<String, Map<String, String>> configurationTags;
   private Map<String, String> commandParams;
   private String serviceName;
-  
+  private String componentName;
+
   @JsonProperty("commandId")
   public String getCommandId() {
     return this.commandId;
@@ -202,6 +203,16 @@ public class ExecutionCommand extends AgentCommand {
     this.serviceName = serviceName;
   }
 
+  @JsonProperty("componentName")
+  public String getComponentName() {
+    return componentName;
+  }
+
+  @JsonProperty("componentName")
+  public void setComponentName(String componentName) {
+    this.componentName = componentName;
+  }
+
   /**
    * @param configTags the config tag map
    */

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 60aede9..9da4d45 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -187,16 +187,19 @@ public class HeartBeatHandler {
     return response;
   }
 
-  protected void processCommandReports(HeartBeat heartbeat,
-                                       String hostname,
-                                       Clusters clusterFsm, long now)
+  protected void processCommandReports(
+      HeartBeat heartbeat, String hostname, Clusters clusterFsm, long now)
       throws AmbariException {
     List<CommandReport> reports = heartbeat.getReports();
     for (CommandReport report : reports) {
       LOG.debug("Received command report: " + report);
+      if (RoleCommand.ACTIONEXECUTE.equals(report.getRoleCommand())) {
+        continue;
+      }
+
       Cluster cl = clusterFsm.getCluster(report.getClusterName());
       String service = report.getServiceName();
-      if (service == null || "".equals(service)) {
+      if (service == null || service.isEmpty()) {
         throw new AmbariException("Invalid command report, service: " + service);
       }
       if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
@@ -217,7 +220,7 @@ public class HeartBeatHandler {
                 && null != report.getConfigurationTags()
                 && !report.getConfigurationTags().isEmpty()) {
               LOG.info("Updating applied config on service " + scHost.getServiceName() +
-              ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
+                  ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
               scHost.updateActualConfigs(report.getConfigurationTags());
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
new file mode 100644
index 0000000..f1bea70
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.actionmanager.TargetHostType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The context required to create tasks and stages for a custom action
+ */
+public class ActionExecutionContext {
+  private final String clusterName;
+  private final String actionName;
+  private final String serviceName;
+  private final String componentName;
+  private final List<String> hosts;
+  private final Map<String, String> parameters;
+  private final TargetHostType targetType;
+  private final Short timeout;
+
+  /**
+   * Create an ActionExecutionContext to execute an action from a request
+   */
+  public ActionExecutionContext(String clusterName, String actionName, String serviceName,
+                                String componentName, List<String> hosts, Map<String, String> parameters,
+                                TargetHostType targetType, Short timeout) {
+    this.clusterName = clusterName;
+    this.actionName = actionName;
+    this.serviceName = serviceName;
+    this.componentName = componentName;
+    this.parameters = parameters;
+    this.hosts = new ArrayList<String>();
+    if (hosts != null) {
+      this.hosts.addAll(hosts);
+    }
+    this.targetType = targetType;
+    this.timeout = timeout;
+  }
+
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public String getActionName() {
+    return actionName;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public String getComponentName() {
+    return componentName;
+  }
+
+  public Map<String, String> getParameters() {
+    return parameters;
+  }
+
+  public List<String> getHosts() {
+    return hosts;
+  }
+
+  public TargetHostType getTargetType() {
+    return targetType;
+  }
+
+  public Short getTimeout() {
+    return timeout;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
new file mode 100644
index 0000000..632b11d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.StackAccessException;
+import org.apache.ambari.server.actionmanager.ActionDefinition;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.TargetHostType;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.ComponentInfo;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceInfo;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Helper class containing logic to process custom action execution requests
+ */
+public class AmbariActionExecutionHelper {
+  private final static Logger LOG =
+      LoggerFactory.getLogger(AmbariCustomCommandExecutionHelper.class);
+  private ActionMetadata actionMetadata;
+  private Clusters clusters;
+  private AmbariManagementControllerImpl amcImpl;
+  private ActionManager actionManager;
+  private AmbariMetaInfo ambariMetaInfo;
+
+  public AmbariActionExecutionHelper(ActionMetadata actionMetadata, Clusters clusters,
+                                     AmbariManagementControllerImpl amcImpl) {
+    this.amcImpl = amcImpl;
+    this.actionMetadata = actionMetadata;
+    this.clusters = clusters;
+    this.actionManager = amcImpl.getActionManager();
+    this.ambariMetaInfo = amcImpl.getAmbariMetaInfo();
+  }
+
+  /**
+   * Validates the request to execute an action
+   *
+   * @param actionRequest
+   * @param cluster
+   * @return
+   * @throws AmbariException
+   */
+  public ActionExecutionContext validateCustomAction(ExecuteActionRequest actionRequest, Cluster cluster)
+      throws AmbariException {
+    if (actionRequest.getActionName() == null || actionRequest.getActionName().isEmpty()) {
+      throw new AmbariException("Action name must be specified");
+    }
+
+    ActionDefinition actionDef = actionManager.getActionDefinition(actionRequest.getActionName());
+    if (actionDef == null) {
+      throw new AmbariException("Action " + actionRequest.getActionName() + " does not exist");
+    }
+
+    StackId stackId = cluster.getCurrentStackVersion();
+    String expectedService = actionDef.getTargetService() == null ? "" : actionDef.getTargetService();
+    String actualService = actionRequest.getServiceName() == null ? "" : actionRequest.getServiceName();
+    if (!expectedService.isEmpty() && !actualService.isEmpty() && !expectedService.equals(actualService)) {
+      throw new AmbariException("Action " + actionRequest.getActionName() + " targets service " + actualService +
+          " that does not match with expected " + expectedService);
+    }
+
+    String targetService = expectedService;
+    if (targetService == null || targetService.isEmpty()) {
+      targetService = actualService;
+    }
+
+    if (targetService != null && !targetService.isEmpty()) {
+      ServiceInfo serviceInfo;
+      try {
+        serviceInfo = ambariMetaInfo.getService(stackId.getStackName(), stackId.getStackVersion(),
+            targetService);
+      } catch (StackAccessException se) {
+        serviceInfo = null;
+      }
+
+      if (serviceInfo == null) {
+        throw new AmbariException("Action " + actionRequest.getActionName() + " targets service " + targetService +
+            " that does not exist.");
+      }
+    }
+
+    String expectedComponent = actionDef.getTargetComponent() == null ? "" : actionDef.getTargetComponent();
+    String actualComponent = actionRequest.getComponentName() == null ? "" : actionRequest.getComponentName();
+    if (!expectedComponent.isEmpty() && !actualComponent.isEmpty() && !expectedComponent.equals(actualComponent)) {
+      throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + actualComponent +
+          " that does not match with expected " + expectedComponent);
+    }
+
+    String targetComponent = expectedComponent;
+    if (targetComponent == null || targetComponent.isEmpty()) {
+      targetComponent = actualComponent;
+    }
+
+    if (!targetComponent.isEmpty() && targetService.isEmpty()) {
+      throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + targetComponent +
+          " without specifying the target service.");
+    }
+
+    if (targetComponent != null && !targetComponent.isEmpty()) {
+      ComponentInfo compInfo;
+      try {
+        compInfo = ambariMetaInfo.getComponent(stackId.getStackName(), stackId.getStackVersion(),
+            targetService, targetComponent);
+      } catch (StackAccessException se) {
+        compInfo = null;
+      }
+
+      if (compInfo == null) {
+        throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + targetComponent +
+            " that does not exist.");
+      }
+    }
+
+    if (actionDef.getInputs() != null) {
+      String[] inputs = actionDef.getInputs().split(",");
+      for (String input : inputs) {
+        if (!input.trim().isEmpty() && !actionRequest.getParameters().containsKey(input.trim())) {
+          throw new AmbariException("Action " + actionRequest.getActionName() + " requires input '" +
+              input.trim() + "' that is not provided.");
+        }
+      }
+    }
+
+    if (actionDef.getTargetType() == TargetHostType.SPECIFIC
+        || (targetService.isEmpty() && targetService.isEmpty())) {
+      if (actionRequest.getHosts().size() == 0) {
+        throw new AmbariException("Action " + actionRequest.getActionName() + " requires explicit target host(s)" +
+            " that is not provided.");
+      }
+    }
+
+    LOG.info("Received action execution request"
+        + ", clusterName=" + actionRequest.getClusterName()
+        + ", request=" + actionRequest.toString());
+
+    ActionExecutionContext actionExecutionContext = new ActionExecutionContext(
+        actionRequest.getClusterName(), actionRequest.getActionName(), targetService, targetComponent,
+        actionRequest.getHosts(), actionRequest.getParameters(), actionDef.getTargetType(),
+        actionDef.getDefaultTimeout());
+
+    return actionExecutionContext;
+  }
+
+  /**
+   * Add tasks to the stage based on the requested action execution
+   *
+   * @param actionContext   the context associated with the action
+   * @param stage           stage into which tasks must be inserted
+   * @param configuration
+   * @param hostsMap
+   * @param hostLevelParams
+   * @throws AmbariException
+   */
+  public void addAction(ActionExecutionContext actionContext, Stage stage,
+                        Configuration configuration, HostsMap hostsMap, Map<String, String> hostLevelParams)
+      throws AmbariException {
+    String actionName = actionContext.getActionName();
+    String clusterName = actionContext.getClusterName();
+    String serviceName = actionContext.getServiceName();
+    String componentName = actionContext.getComponentName();
+
+    // List of host to select from
+    Set<String> candidateHosts = new HashSet<String>();
+    if (!serviceName.isEmpty()) {
+      if (!componentName.isEmpty()) {
+        Map<String, ServiceComponentHost> componentHosts =
+            clusters.getCluster(clusterName).getService(serviceName)
+                .getServiceComponent(componentName).getServiceComponentHosts();
+        candidateHosts.addAll(componentHosts.keySet());
+      } else {
+        for (String component : clusters.getCluster(clusterName).getService(serviceName)
+            .getServiceComponents().keySet()) {
+          Map<String, ServiceComponentHost> componentHosts =
+              clusters.getCluster(clusterName).getService(serviceName)
+                  .getServiceComponent(component).getServiceComponentHosts();
+          candidateHosts.addAll(componentHosts.keySet());
+        }
+      }
+    } else {
+      // All hosts are valid target host
+      candidateHosts.addAll(amcImpl.getClusters().getHostsForCluster(clusterName).keySet());
+    }
+
+    // If request did not specify hosts and there exists no host
+    if (actionContext.getHosts().isEmpty() && candidateHosts.isEmpty()) {
+      throw new AmbariException("Suitable hosts not found, component="
+          + componentName + ", service=" + serviceName
+          + ", cluster=" + clusterName + ", actionName=" + actionName);
+    }
+
+    // Compare specified hosts to available hosts
+    if (!actionContext.getHosts().isEmpty() && !candidateHosts.isEmpty()) {
+      for (String hostname : actionContext.getHosts()) {
+        if (!candidateHosts.contains(hostname)) {
+          throw new AmbariException("Request specifies host " + hostname + " but its not a valid host based on the " +
+              "target service=" + serviceName + " and component=" + componentName);
+        }
+      }
+    }
+
+    //Find target hosts to execute
+    if (actionContext.getHosts().isEmpty()) {
+      TargetHostType hostType = actionContext.getTargetType();
+      switch (hostType) {
+        case ALL:
+          actionContext.getHosts().addAll(candidateHosts);
+          break;
+        case ANY:
+          actionContext.getHosts().add(amcImpl.getHealthyHost(candidateHosts));
+          break;
+        case MAJORITY:
+          for (int i = 0; i < (candidateHosts.size() / 2) + 1; i++) {
+            String hostname = amcImpl.getHealthyHost(candidateHosts);
+            actionContext.getHosts().add(hostname);
+            candidateHosts.remove(hostname);
+          }
+          break;
+        default:
+          throw new AmbariException("Unsupported target type=" + hostType);
+      }
+    }
+
+    //create tasks for each host
+    for (String hostName : actionContext.getHosts()) {
+      stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionContext.getActionName()), RoleCommand.ACTIONEXECUTE,
+          new ServiceComponentHostOpInProgressEvent(actionContext.getActionName(), hostName,
+              System.currentTimeMillis()), clusterName, actionContext.getServiceName());
+
+      stage.getExecutionCommandWrapper(hostName, actionContext.getActionName()).getExecutionCommand()
+          .setRoleParams(actionContext.getParameters());
+
+      Cluster cluster = clusters.getCluster(clusterName);
+
+      Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
+      Map<String, Map<String, String>> configTags = null;
+      if (!actionContext.getServiceName().isEmpty()) {
+        configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, hostName);
+      }
+
+      ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
+          actionContext.getActionName()).getExecutionCommand();
+
+      execCmd.setConfigurations(configurations);
+      execCmd.setConfigurationTags(configTags);
+      execCmd.setHostLevelParams(hostLevelParams);
+      execCmd.setCommandParams(actionContext.getParameters());
+      execCmd.setServiceName(serviceName);
+      execCmd.setComponentName(componentName);
+
+      // Generate cluster host info
+      execCmd.setClusterHostInfo(
+          StageUtils.getClusterHostInfo(clusters.getHostsForCluster(clusterName), cluster, hostsMap, configuration));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
new file mode 100644
index 0000000..fa7522b
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Helper class containing logic to process custom command execution requests
+ */
+public class AmbariCustomCommandExecutionHelper {
+  private final static Logger LOG =
+      LoggerFactory.getLogger(AmbariCustomCommandExecutionHelper.class);
+  private ActionMetadata actionMetadata;
+  private Clusters clusters;
+  private AmbariManagementControllerImpl amcImpl;
+
+  public AmbariCustomCommandExecutionHelper(ActionMetadata actionMetadata, Clusters clusters,
+                                            AmbariManagementControllerImpl amcImpl) {
+    this.amcImpl = amcImpl;
+    this.actionMetadata = actionMetadata;
+    this.clusters = clusters;
+  }
+
+  public void validateCustomCommand(ExecuteActionRequest actionRequest) throws AmbariException {
+    if (actionRequest.getServiceName() == null
+        || actionRequest.getServiceName().isEmpty()
+        || actionRequest.getCommandName() == null
+        || actionRequest.getCommandName().isEmpty()) {
+      throw new AmbariException("Invalid request : " + "cluster="
+          + actionRequest.getClusterName() + ", service="
+          + actionRequest.getServiceName() + ", command="
+          + actionRequest.getCommandName());
+    }
+
+    LOG.info("Received a command execution request"
+        + ", clusterName=" + actionRequest.getClusterName()
+        + ", serviceName=" + actionRequest.getServiceName()
+        + ", request=" + actionRequest.toString());
+
+    if (!isValidCommand(actionRequest.getCommandName(), actionRequest.getServiceName())) {
+      throw new AmbariException(
+          "Unsupported action " + actionRequest.getCommandName() + " for " + actionRequest.getServiceName());
+    }
+  }
+
+  private Boolean isValidCommand(String command, String service) {
+    List<String> actions = actionMetadata.getActions(service);
+    if (actions == null || actions.size() == 0) {
+      return false;
+    }
+
+    if (!actions.contains(command)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  public void addAction(ExecuteActionRequest actionRequest, Stage stage,
+                        Configuration configuration, HostsMap hostsMap, Map<String, String> hostLevelParams)
+      throws AmbariException {
+    if (actionRequest.getCommandName().contains("SERVICE_CHECK")) {
+      addServiceCheckAction(actionRequest, stage, configuration, hostsMap, hostLevelParams);
+    } else if (actionRequest.getCommandName().equals("DECOMMISSION_DATANODE")) {
+      addDecommissionDatanodeAction(actionRequest, stage, hostLevelParams);
+    } else {
+      throw new AmbariException("Unsupported action " + actionRequest.getCommandName());
+    }
+  }
+
+  private void addServiceCheckAction(ExecuteActionRequest actionRequest, Stage stage,
+                                     Configuration configuration, HostsMap hostsMap,
+                                     Map<String, String> hostLevelParams)
+      throws AmbariException {
+    String clusterName = actionRequest.getClusterName();
+    String componentName = actionMetadata.getClient(actionRequest
+        .getServiceName());
+
+    String hostName;
+    if (componentName != null) {
+      Map<String, ServiceComponentHost> components = clusters
+          .getCluster(clusterName).getService(actionRequest.getServiceName())
+          .getServiceComponent(componentName).getServiceComponentHosts();
+
+      if (components.isEmpty()) {
+        throw new AmbariException("Hosts not found, component="
+            + componentName + ", service=" + actionRequest.getServiceName()
+            + ", cluster=" + clusterName);
+      }
+      hostName = amcImpl.getHealthyHost(components.keySet());
+    } else {
+      Map<String, ServiceComponent> components = clusters
+          .getCluster(clusterName).getService(actionRequest.getServiceName())
+          .getServiceComponents();
+
+      if (components.isEmpty()) {
+        throw new AmbariException("Components not found, service="
+            + actionRequest.getServiceName() + ", cluster=" + clusterName);
+      }
+
+      ServiceComponent serviceComponent = components.values().iterator()
+          .next();
+
+      if (serviceComponent.getServiceComponentHosts().isEmpty()) {
+        throw new AmbariException("Hosts not found, component="
+            + serviceComponent.getName() + ", service="
+            + actionRequest.getServiceName() + ", cluster=" + clusterName);
+      }
+
+      hostName = serviceComponent.getServiceComponentHosts().keySet()
+          .iterator().next();
+    }
+
+    stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionRequest
+        .getCommandName()), RoleCommand.EXECUTE,
+        new ServiceComponentHostOpInProgressEvent(componentName, hostName,
+            System.currentTimeMillis()), clusterName, actionRequest
+        .getServiceName());
+
+    stage.getExecutionCommandWrapper(hostName, actionRequest.getCommandName()).getExecutionCommand()
+        .setRoleParams(actionRequest.getParameters());
+
+    Cluster cluster = clusters.getCluster(clusterName);
+
+    // [ type -> [ key, value ] ]
+    Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
+    Map<String, Map<String, String>> configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, hostName);
+
+    ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
+        actionRequest.getCommandName()).getExecutionCommand();
+
+    execCmd.setConfigurations(configurations);
+    execCmd.setConfigurationTags(configTags);
+    execCmd.setHostLevelParams(hostLevelParams);
+
+    // Generate cluster host info
+    execCmd.setClusterHostInfo(
+        StageUtils.getClusterHostInfo(clusters.getHostsForCluster(clusterName), cluster, hostsMap, configuration));
+  }
+
+  private void addDecommissionDatanodeAction(ExecuteActionRequest decommissionRequest, Stage stage,
+                                             Map<String, String> hostLevelParams)
+      throws AmbariException {
+    String hdfsExcludeFileType = "hdfs-exclude-file";
+    // Find hdfs admin host, just decommission from namenode.
+    String clusterName = decommissionRequest.getClusterName();
+    Cluster cluster = clusters.getCluster(clusterName);
+    String serviceName = decommissionRequest.getServiceName();
+    String namenodeHost = clusters.getCluster(clusterName)
+        .getService(serviceName).getServiceComponent(Role.NAMENODE.toString())
+        .getServiceComponentHosts().keySet().iterator().next();
+
+    String excludeFileTag = null;
+    if (decommissionRequest.getParameters() != null
+        && (decommissionRequest.getParameters().get("excludeFileTag") != null)) {
+      excludeFileTag = decommissionRequest.getParameters()
+          .get("excludeFileTag");
+    }
+
+    if (excludeFileTag == null) {
+      throw new AmbariException("No exclude file specified"
+          + " when decommissioning datanodes. Provide parameter excludeFileTag with the tag for config type "
+          + hdfsExcludeFileType);
+    }
+
+    Config config = clusters.getCluster(clusterName).getConfig(
+        hdfsExcludeFileType, excludeFileTag);
+    if (config == null) {
+      throw new AmbariException("Decommissioning datanodes requires the cluster to be associated with config type " +
+          hdfsExcludeFileType + " with a list of datanodes to be decommissioned (\"datanodes\" : list).");
+    }
+
+    LOG.info("Decommissioning data nodes: " + config.getProperties().get("datanodes") +
+        " " + hdfsExcludeFileType + " tag: " + excludeFileTag);
+
+    Map<String, Map<String, String>> configurations =
+        new TreeMap<String, Map<String, String>>();
+
+
+    Map<String, Map<String, String>> configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, namenodeHost);
+
+    // Add the tag for hdfs-exclude-file
+    Map<String, String> excludeTags = new HashMap<String, String>();
+    excludeTags.put(ConfigHelper.CLUSTER_DEFAULT_TAG, config.getVersionTag());
+    configTags.put(hdfsExcludeFileType, excludeTags);
+
+    stage.addHostRoleExecutionCommand(
+        namenodeHost,
+        Role.DECOMMISSION_DATANODE,
+        RoleCommand.EXECUTE,
+        new ServiceComponentHostOpInProgressEvent(Role.DECOMMISSION_DATANODE
+            .toString(), namenodeHost, System.currentTimeMillis()),
+        clusterName, serviceName);
+
+    ExecutionCommand execCmd = stage.getExecutionCommandWrapper(namenodeHost,
+        Role.DECOMMISSION_DATANODE.toString()).getExecutionCommand();
+
+    execCmd.setConfigurations(configurations);
+    execCmd.setConfigurationTags(configTags);
+    execCmd.setHostLevelParams(hostLevelParams);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index ef5c372..890cb81 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -165,7 +165,10 @@ public class AmbariManagementControllerImpl implements
   final private String ojdbcUrl;
   final private String serverDB;
   final private String mysqljdbcUrl;
-  
+
+  final private AmbariCustomCommandExecutionHelper customCommandExecutionHelper;
+  final private AmbariActionExecutionHelper actionExecutionHelper;
+
   @Inject
   public AmbariManagementControllerImpl(ActionManager actionManager,
       Clusters clusters, Injector injector) throws Exception {
@@ -200,6 +203,11 @@ public class AmbariManagementControllerImpl implements
       this.mysqljdbcUrl = null;
       this.serverDB = null;
     }
+
+    this.customCommandExecutionHelper = new AmbariCustomCommandExecutionHelper(
+        this.actionMetadata, this.clusters, this);
+    this.actionExecutionHelper = new AmbariActionExecutionHelper(
+        this.actionMetadata, this.clusters, this);
   }
   
   public String getAmbariServerURI(String path) {
@@ -1090,7 +1098,7 @@ public class AmbariManagementControllerImpl implements
    * @return
    * @throws AmbariException
    */
-  private Map<String, Map<String,String>> findConfigurationTagsWithOverrides(
+  protected Map<String, Map<String,String>> findConfigurationTagsWithOverrides(
     Cluster cluster, String hostName) throws AmbariException {
 
     Map<String, Map<String,String>> configTags =
@@ -1099,7 +1107,6 @@ public class AmbariManagementControllerImpl implements
     return configTags;
   }
 
-
   private List<Stage> doStageCreation(Cluster cluster,
       Map<State, List<Service>> changedServices,
       Map<State, List<ServiceComponent>> changedComps,
@@ -1142,7 +1149,7 @@ public class AmbariManagementControllerImpl implements
       // multiple stages may be needed for reconfigure
       long stageId = 0;
       Map<String, List<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
-          clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector);
+          clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector.getInstance(Configuration.class));
       
       
       String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);
@@ -1353,7 +1360,8 @@ public class AmbariManagementControllerImpl implements
         stage.getExecutionCommandWrapper(clientHost, smokeTestRole)
             .getExecutionCommand()
             .setClusterHostInfo(StageUtils.getClusterHostInfo(
-                clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector));
+                clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap,
+                injector.getInstance(Configuration.class)));
 
         Map<String,String> hostParams = new HashMap<String, String>();
         hostParams.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
@@ -1381,7 +1389,7 @@ public class AmbariManagementControllerImpl implements
             + ", requestId=" + stages.get(0).getRequestId()
             + ", stagesCount=" + stages.size());
       }
-      actionManager.sendActions(stages);
+      actionManager.sendActions(stages, null);
     }
   }
 
@@ -1472,7 +1480,6 @@ public class AmbariManagementControllerImpl implements
     Map<String, Map<String, Map<String, Set<String>>>> hostComponentNames =
         new HashMap<String, Map<String, Map<String, Set<String>>>>();
     Set<State> seenNewStates = new HashSet<State>();
-    boolean processingUpgradeRequest = false;
     int numberOfRequestsProcessed = 0;
     StackId fromStackVersion = new StackId();
     Map<ServiceComponentHost, State> directTransitionScHosts = new HashMap<ServiceComponentHost, State>();
@@ -1541,9 +1548,6 @@ public class AmbariManagementControllerImpl implements
         }
       }
 
-      // If upgrade request comes without state information then its an error
-      boolean upgradeRequest = checkIfUpgradeRequestAndValidate(request, cluster, s, sc, sch);
-
       if (newState == null) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Nothing to do for new updateServiceComponentHost request"
@@ -1565,25 +1569,6 @@ public class AmbariManagementControllerImpl implements
 
       seenNewStates.add(newState);
 
-      if (!processingUpgradeRequest && upgradeRequest) {
-        processingUpgradeRequest = true;
-        // this needs to be the first request
-        if (numberOfRequestsProcessed > 1) {
-          throw new AmbariException("An upgrade request cannot be combined with " +
-              "other non-upgrade requests.");
-        }
-        fromStackVersion = sch.getStackVersion();
-      }
-
-      if (processingUpgradeRequest) {
-        if (!upgradeRequest) {
-          throw new AmbariException("An upgrade request cannot be combined with " +
-              "other non-upgrade requests.");
-        }
-        sch.setState(State.UPGRADING);
-        sch.setDesiredStackVersion(cluster.getCurrentStackVersion());
-      }
-
       State oldSchState = sch.getState();
       // Client component reinstall allowed
       if (newState == oldSchState && !sc.isClientComponent()) {
@@ -1613,20 +1598,6 @@ public class AmbariManagementControllerImpl implements
       }
 
       if (isDirectTransition(oldSchState, newState)) {
-
-//        if (newState == State.DELETED) {
-//          if (!sch.canBeRemoved()) {
-//            throw new AmbariException("Servicecomponenthost cannot be removed"
-//                + ", clusterName=" + cluster.getClusterName()
-//                + ", clusterId=" + cluster.getClusterId()
-//                + ", serviceName=" + sch.getServiceName()
-//                + ", componentName=" + sch.getServiceComponentName()
-//                + ", hostname=" + sch.getHostName()
-//                + ", currentState=" + oldSchState
-//                + ", newDesiredState=" + newState);
-//          }
-//        }
-
         if (LOG.isDebugEnabled()) {
           LOG.debug("Handling direct transition update to ServiceComponentHost"
               + ", clusterName=" + request.getClusterName()
@@ -1698,13 +1669,8 @@ public class AmbariManagementControllerImpl implements
 
     Cluster cluster = clusters.getCluster(clusterNames.iterator().next());
 
-    Map<String, String> requestParameters = null;
-    if (processingUpgradeRequest) {
-      requestParameters = new HashMap<String, String>();
-      requestParameters.put(Configuration.UPGRADE_TO_STACK, gson.toJson(cluster.getCurrentStackVersion()));
-      requestParameters.put(Configuration.UPGRADE_FROM_STACK, gson.toJson(fromStackVersion));
-    }
-    return createStages(cluster, requestProperties, requestParameters, null, null, changedScHosts, ignoredScHosts, runSmokeTest, false);
+    return createStages(cluster, requestProperties, null, null, null, changedScHosts, ignoredScHosts, runSmokeTest,
+        false);
   }
 
   private void validateServiceComponentHostRequest(ServiceComponentHostRequest request) {
@@ -2187,7 +2153,7 @@ public class AmbariManagementControllerImpl implements
     return null;
   }
 
-  private String getHealthyHost(Set<String> hostList) throws AmbariException {
+  protected String getHealthyHost(Set<String> hostList) throws AmbariException {
     // Return a healthy host if found otherwise any random host
     String hostName = null;
     for (String candidateHostName : hostList) {
@@ -2200,149 +2166,11 @@ public class AmbariManagementControllerImpl implements
     return hostName;
   }
 
-  private void addServiceCheckAction(ExecuteActionRequest actionRequest, Stage stage)
-      throws AmbariException {
-    String clusterName = actionRequest.getClusterName();
-    String componentName = actionMetadata.getClient(actionRequest
-        .getServiceName());
-
-    String hostName;
-    if (componentName != null) {
-      Map<String, ServiceComponentHost> components = clusters
-          .getCluster(clusterName).getService(actionRequest.getServiceName())
-          .getServiceComponent(componentName).getServiceComponentHosts();
-
-      if (components.isEmpty()) {
-        throw new AmbariException("Hosts not found, component="
-            + componentName + ", service=" + actionRequest.getServiceName()
-            + ", cluster=" + clusterName);
-      }
-      hostName = getHealthyHost(components.keySet());
-    } else {
-      Map<String, ServiceComponent> components = clusters
-          .getCluster(clusterName).getService(actionRequest.getServiceName())
-          .getServiceComponents();
-
-      if (components.isEmpty()) {
-        throw new AmbariException("Components not found, service="
-            + actionRequest.getServiceName() + ", cluster=" + clusterName);
-      }
-
-      ServiceComponent serviceComponent = components.values().iterator()
-          .next();
-
-      if (serviceComponent.getServiceComponentHosts().isEmpty()) {
-        throw new AmbariException("Hosts not found, component="
-            + serviceComponent.getName() + ", service="
-            + actionRequest.getServiceName() + ", cluster=" + clusterName);
-      }
-
-      hostName = serviceComponent.getServiceComponentHosts().keySet()
-          .iterator().next();
-    }
-
-    stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionRequest
-        .getCommandName()), RoleCommand.EXECUTE,
-        new ServiceComponentHostOpInProgressEvent(componentName, hostName,
-            System.currentTimeMillis()), clusterName, actionRequest
-            .getServiceName());
-
-    stage.getExecutionCommandWrapper(hostName, actionRequest.getCommandName()).getExecutionCommand()
-        .setRoleParams(actionRequest.getParameters());
-
-    Cluster cluster = clusters.getCluster(clusterName);
-    
-    // [ type -> [ key, value ] ]
-    Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String,String>>();
-    Map<String, Map<String, String>> configTags =
-      findConfigurationTagsWithOverrides(cluster, hostName);
-
-    ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
-      actionRequest.getCommandName()).getExecutionCommand();
-
-    execCmd.setConfigurations(configurations);
-    execCmd.setConfigurationTags(configTags);
-
-    Map<String, String> params = new TreeMap<String, String>();
-    params.put("jdk_location", this.jdkResourceUrl);
-    params.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
-    execCmd.setHostLevelParams(params);
-  }
-
-  private void addDecommissionDatanodeAction(
-      ExecuteActionRequest decommissionRequest, Stage stage)
-      throws AmbariException {
-    String hdfsExcludeFileType = "hdfs-exclude-file";
-    // Find hdfs admin host, just decommission from namenode.
-    String clusterName = decommissionRequest.getClusterName();
-    Cluster cluster = clusters.getCluster(clusterName);
-    String serviceName = decommissionRequest.getServiceName();
-    String namenodeHost = clusters.getCluster(clusterName)
-        .getService(serviceName).getServiceComponent(Role.NAMENODE.toString())
-        .getServiceComponentHosts().keySet().iterator().next();
-
-    String excludeFileTag = null;
-    if (decommissionRequest.getParameters() != null
-        && (decommissionRequest.getParameters().get("excludeFileTag") != null)) {
-      excludeFileTag = decommissionRequest.getParameters()
-          .get("excludeFileTag");
-    }
-
-    if (excludeFileTag == null) {
-      throw new IllegalArgumentException("No exclude file specified"
-          + " when decommissioning datanodes. Provide parameter excludeFileTag with the tag for config type "
-          + hdfsExcludeFileType);
-    }
-
-    Config config = clusters.getCluster(clusterName).getConfig(
-        hdfsExcludeFileType, excludeFileTag);
-    if(config == null){
-      throw new AmbariException("Decommissioning datanodes requires the cluster to be associated with config type " +
-      hdfsExcludeFileType + " with a list of datanodes to be decommissioned (\"datanodes\" : list).");
-    }
-
-    LOG.info("Decommissioning data nodes: " + config.getProperties().get("datanodes") +
-        " " + hdfsExcludeFileType + " tag: " + excludeFileTag);
-
-    Map<String, Map<String, String>> configurations =
-        new TreeMap<String, Map<String, String>>();
-
-    
-    Map<String, Map<String, String>> configTags =
-      findConfigurationTagsWithOverrides(cluster, namenodeHost);
-    
-    // Add the tag for hdfs-exclude-file
-    Map<String, String> excludeTags = new HashMap<String, String>();
-    excludeTags.put(ConfigHelper.CLUSTER_DEFAULT_TAG, config.getVersionTag());
-    configTags.put(hdfsExcludeFileType, excludeTags);
-
-    stage.addHostRoleExecutionCommand(
-        namenodeHost,
-        Role.DECOMMISSION_DATANODE,
-        RoleCommand.EXECUTE,
-        new ServiceComponentHostOpInProgressEvent(Role.DECOMMISSION_DATANODE
-            .toString(), namenodeHost, System.currentTimeMillis()),
-        clusterName, serviceName);
-
-    ExecutionCommand execCmd = stage.getExecutionCommandWrapper(namenodeHost,
-        Role.DECOMMISSION_DATANODE.toString()).getExecutionCommand();
-
-    execCmd.setConfigurations(configurations);
-    execCmd.setConfigurationTags(configTags);
-
-    Map<String, String> params = new TreeMap<String, String>();
-    params.put("jdk_location", this.jdkResourceUrl);
-    params.put("stack_version", cluster.getDesiredStackVersion()
-        .getStackVersion());
-    execCmd.setHostLevelParams(params);
-
-  }
-
   @Override
   public RequestStatusResponse createAction(ExecuteActionRequest actionRequest, Map<String, String> requestProperties)
       throws AmbariException {
-    String clusterName = null;
-
+    String clusterName;
+    Configuration configuration = injector.getInstance(Configuration.class);
     String requestContext = "";
 
     if (requestProperties != null) {
@@ -2353,48 +2181,38 @@ public class AmbariManagementControllerImpl implements
       }
     }
 
-    String logDir = ""; //TODO empty for now
-
     if (actionRequest.getClusterName() == null
-        || actionRequest.getClusterName().isEmpty()
-        || actionRequest.getServiceName() == null
-        || actionRequest.getServiceName().isEmpty()
-        || actionRequest.getCommandName() == null
-        || actionRequest.getCommandName().isEmpty()) {
-      throw new AmbariException("Invalid action request : " + "cluster="
-          + actionRequest.getClusterName() + ", service="
-          + actionRequest.getServiceName() + ", command="
-          + actionRequest.getCommandName());
+        || actionRequest.getClusterName().isEmpty()) {
+      throw new AmbariException("Invalid request, cluster name must be specified");
     }
-
     clusterName = actionRequest.getClusterName();
-    
+
     Cluster cluster = clusters.getCluster(clusterName);
-    
-    Map<String, List<String>> clusterHostInfoMap = StageUtils.getClusterHostInfo(clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector);
 
-    String clusterHostInfo = StageUtils.getGson().toJson(clusterHostInfoMap);
-    
-    Stage stage = stageFactory.createNew(actionManager.getNextRequestId(),
-        logDir, clusterName, requestContext, clusterHostInfo);
+    ActionExecutionContext actionExecContext = null;
+    if (actionRequest.isCommand()) {
+      customCommandExecutionHelper.validateCustomCommand(actionRequest);
+    } else {
+      actionExecContext = actionExecutionHelper.validateCustomAction(actionRequest, cluster);
+    }
+
+    Map<String, List<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
+        clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap,
+        injector.getInstance(Configuration.class));
+
+    String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);
+    Stage stage = createNewStage(cluster, actionManager.getNextRequestId(), requestContext, clusterHostInfoJson);
 
     stage.setStageId(0);
-    LOG.info("Received a createAction request"
-        + ", clusterName=" + actionRequest.getClusterName()
-        + ", serviceName=" + actionRequest.getServiceName()
-        + ", request=" + actionRequest.toString());
 
-    if (!isValidCommand(actionRequest.getCommandName(), actionRequest.getServiceName())) {
-      throw new AmbariException(
-          "Unsupported action " + actionRequest.getCommandName() + " for " + actionRequest.getServiceName());
-    }
+    Map<String, String> params = new TreeMap<String, String>();
+    params.put("jdk_location", this.jdkResourceUrl);
+    params.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
 
-    if (actionRequest.getCommandName().contains("SERVICE_CHECK")) {
-      addServiceCheckAction(actionRequest, stage);
-    } else if (actionRequest.getCommandName().equals("DECOMMISSION_DATANODE")) {
-      addDecommissionDatanodeAction(actionRequest, stage);
+    if (actionRequest.isCommand()) {
+      customCommandExecutionHelper.addAction(actionRequest, stage, configuration, hostsMap, params);
     } else {
-      throw new AmbariException("Unsupported action " + actionRequest.getCommandName());
+      actionExecutionHelper.addAction(actionExecContext, stage, configuration, hostsMap, params);
     }
 
     RoleCommandOrder rco = this.getRoleCommandOrder(cluster);
@@ -2402,7 +2220,7 @@ public class AmbariManagementControllerImpl implements
     rg.build(stage);
     List<Stage> stages = rg.getStages();
     if (stages != null && !stages.isEmpty()) {
-      actionManager.sendActions(stages);
+      actionManager.sendActions(stages, actionRequest);
       return getRequestStatusResponse(stage.getRequestId());
     } else {
       throw new AmbariException("Stage was not created");
@@ -2428,18 +2246,6 @@ public class AmbariManagementControllerImpl implements
 
   }
 
-  private Boolean isValidCommand(String command, String service) {
-    List<String> actions = actionMetadata.getActions(service);
-    if (actions == null || actions.size() == 0) {
-      return false;
-    }
-
-    if (!actions.contains(command)) {
-      return false;
-    }
-
-    return true;
-  }
 
   private Set<StackResponse> getStacks(StackRequest request)
       throws AmbariException {

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
index fd1cd1f..f8dd908 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.ambari.server.controller;
 
+import org.apache.ambari.server.utils.StageUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -33,26 +37,29 @@ public class ExecuteActionRequest {
   private Map<String, String> parameters;
 
   public ExecuteActionRequest(String clusterName, String commandName,
-                       String actionName, String serviceName, String componentName,
-                       List<String> hosts, Map<String, String> parameters) {
-    this.clusterName = clusterName;
-    this.commandName = commandName;
+                              String actionName, String serviceName, String componentName,
+                              List<String> hosts, Map<String, String> parameters) {
+    this(clusterName, commandName, serviceName, parameters);
     this.actionName = actionName;
-    this.serviceName = serviceName;
     this.componentName = componentName;
-    this.parameters = parameters;
-    this.hosts = hosts;
+    if (hosts != null) {
+      this.hosts.addAll(hosts);
+    }
   }
 
   /**
    * Create an ExecuteActionRequest to execute a command
    */
   public ExecuteActionRequest(String clusterName, String commandName, String serviceName,
-                       Map<String, String> parameters) {
+                              Map<String, String> parameters) {
     this.clusterName = clusterName;
     this.commandName = commandName;
     this.serviceName = serviceName;
-    this.parameters = parameters;
+    this.parameters = new HashMap<String, String>();
+    if (parameters != null) {
+      this.parameters.putAll(parameters);
+    }
+    this.hosts = new ArrayList<String>();
   }
 
   public String getClusterName() {
@@ -86,4 +93,17 @@ public class ExecuteActionRequest {
   public Boolean isCommand() {
     return actionName == null || actionName.isEmpty();
   }
+
+  @Override
+  public synchronized String toString() {
+    return (new StringBuilder()).
+        append("isCommand :" + isCommand().toString()).
+        append(", action :" + actionName).
+        append(", command :" + commandName).
+        append(", inputs :" + parameters.toString()).
+        append(", targetService :" + serviceName).
+        append(", targetComponent :" + componentName).
+        append(", targetHosts :" + hosts.toString()).
+        append(", clusterName :" + clusterName).toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index f77c8a4..5678887 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -129,14 +129,14 @@ public class HostRoleCommandDAO {
   }
 
   @Transactional
-  public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, Role role) {
+  public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, String role) {
     TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT command " +
         "FROM HostRoleCommandEntity command " +
         "WHERE command.hostName=?1 AND command.requestId=?2 " +
         "AND command.stageId=?3 AND command.role=?4 " +
         "ORDER BY command.taskId", HostRoleCommandEntity.class);
 
-    return daoUtils.selectList(query, hostName, requestId, stageId, role.name());
+    return daoUtils.selectList(query, hostName, requestId, stageId, role);
   }
 
   @Transactional

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index 8271151..79c001a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -77,10 +77,6 @@ public class StageDAO {
 
   @Transactional
   public List<StageEntity> findByCommandStatuses(Collection<HostRoleStatus> statuses) {
-//    TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage " +
-//        "FROM StageEntity stage JOIN stage.hostRoleCommands command " +
-//        "WHERE command.status IN ?1 " +
-//        "ORDER BY stage.requestId, stage.stageId", StageEntity.class);
     TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage " +
           "FROM StageEntity stage WHERE stage.stageId IN (SELECT hrce.stageId FROM " +
           "HostRoleCommandEntity hrce WHERE stage.requestId = hrce.requestId and hrce.status IN ?1 ) " +

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
index 9e23516..7f1d031 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
@@ -21,7 +21,15 @@ package org.apache.ambari.server.orm.entities;
 import org.apache.ambari.server.actionmanager.ActionType;
 import org.apache.ambari.server.actionmanager.TargetHostType;
 
-import javax.persistence.*;
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
 
 @NamedQueries({
     @NamedQuery(name = "allActions", query =
@@ -52,11 +60,11 @@ public class ActionEntity {
   @Basic
   private String targetComponent;
 
-  @Column(name = "description")
+  @Column(name = "description", nullable = false)
   @Basic
   private String description = "";
 
-  @Column(name = "target_type")
+  @Column(name = "target_type", nullable = false)
   @Enumerated(EnumType.STRING)
   private TargetHostType targetType = TargetHostType.ANY;
 

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index b74a685..b922293 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -363,9 +363,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
          State.INSTALLED,
          ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
          new ServiceComponentHostOpCompletedTransition())
-    
 
-    
      .addTransition(State.INSTALLING,
          State.INSTALLING,
          ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,