You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2013/11/14 07:30:53 UTC

[2/2] git commit: AMBARI 3731. Custom Action: Add support for custom action execution

AMBARI 3731. Custom Action: Add support for custom action execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ambari/commit/22f5fdfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ambari/tree/22f5fdfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ambari/diff/22f5fdfb

Branch: refs/heads/trunk
Commit: 22f5fdfb70916fb5ffe486c6bcb50f36bc0de1b4
Parents: 708e59d
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Wed Nov 13 22:30:26 2013 -0800
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Wed Nov 13 22:30:26 2013 -0800

----------------------------------------------------------------------
 .../org/apache/ambari/server/RoleCommand.java   |   3 +-
 .../server/actionmanager/ActionDBAccessor.java  |  73 +++-
 .../actionmanager/ActionDBAccessorImpl.java     |  61 ++-
 .../actionmanager/ActionDBInMemoryImpl.java     |  49 +--
 .../server/actionmanager/ActionManager.java     |   9 +-
 .../server/actionmanager/ActionScheduler.java   |   6 +-
 .../CustomActionDBAccessorImpl.java             |  12 +-
 .../server/actionmanager/HostRoleCommand.java   |   9 -
 .../ambari/server/agent/ExecutionCommand.java   |  13 +-
 .../ambari/server/agent/HeartBeatHandler.java   |  13 +-
 .../controller/ActionExecutionContext.java      |  91 +++++
 .../controller/AmbariActionExecutionHelper.java | 290 ++++++++++++++
 .../AmbariCustomCommandExecutionHelper.java     | 239 ++++++++++++
 .../AmbariManagementControllerImpl.java         | 278 +++-----------
 .../server/controller/ExecuteActionRequest.java |  38 +-
 .../server/orm/dao/HostRoleCommandDAO.java      |   4 +-
 .../apache/ambari/server/orm/dao/StageDAO.java  |   4 -
 .../server/orm/entities/ActionEntity.java       |  14 +-
 .../svccomphost/ServiceComponentHostImpl.java   |   2 -
 .../apache/ambari/server/utils/StageUtils.java  |   5 +-
 .../ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql   |   4 +
 .../actionmanager/TestActionDBAccessorImpl.java |  76 +++-
 .../server/actionmanager/TestActionManager.java |   2 +-
 .../AmbariManagementControllerTest.java         | 376 ++++++++++++++-----
 .../apache/ambari/server/orm/TestOrmImpl.java   |   6 +-
 .../ambari/server/utils/TestStageUtils.java     |   3 +-
 26 files changed, 1214 insertions(+), 466 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
index 33370bf..ad006ec 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
@@ -24,5 +24,6 @@ public enum RoleCommand {
   STOP,
   EXECUTE,
   ABORT,
-  UPGRADE
+  UPGRADE,
+  ACTIONEXECUTE
 }

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 2c79edf..11605bb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -17,23 +17,35 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
+
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.ambari.server.Role;
-import org.apache.ambari.server.agent.CommandReport;
-
 public interface ActionDBAccessor {
 
-  public Stage getAction(String actionId);
+  /**
+   * Given an action id of the form requestId-stageId, retrieve the Stage
+   */
+  public Stage getStage(String actionId);
 
+  /**
+   * Get all stages associated with a single request id
+   */
   public List<Stage> getAllStages(long requestId);
 
+  /**
+   * Abort all outstanding operations associated with the given request
+   */
   public void abortOperation(long requestId);
 
-  public void timeoutHostRole(String host, long requestId, long stageId, Role role);
+  /**
+   * Mark the task as to have timed out
+   */
+  public void timeoutHostRole(String host, long requestId, long stageId, String role);
 
   /**
    * Returns all the pending stages, including queued and not-queued.
@@ -41,47 +53,86 @@ public interface ActionDBAccessor {
    */
   public List<Stage> getStagesInProgress();
 
+  /**
+   * Persists all tasks for a given request
+   *
+   * @param stages  Stages belonging to the request
+   */
   public void persistActions(List<Stage> stages);
 
+  /**
+   * For the given host, update all the tasks based on the command report
+   */
   public void updateHostRoleState(String hostname, long requestId,
-      long stageId, String role, CommandReport report);
+                                  long stageId, String role, CommandReport report);
 
-  public void abortHostRole(String host, long requestId, long stageId,
-      Role role);
+  /**
+   * Mark the task as to have been aborted
+   */
+  public void abortHostRole(String host, long requestId, long stageId, String role);
 
   /**
    * Return the last persisted Request ID as seen when the DBAccessor object
    * was initialized.
    * Value should remain unchanged through the lifetime of the object instance.
+   *
    * @return Request Id seen at init time
    */
   public long getLastPersistedRequestIdWhenInitialized();
 
   /**
    * Updates scheduled stage.
-   * @param s
-   * @param hostname
-   * @param roleStr
    */
   public void hostRoleScheduled(Stage s, String hostname, String roleStr);
 
+  /**
+   * Given a request id, get all the tasks that belong to this request
+   */
   public List<HostRoleCommand> getRequestTasks(long requestId);
 
+  /**
+   * Given a list of request ids, get all the tasks that belong to these requests
+   */
   public List<HostRoleCommand> getAllTasksByRequestIds(Collection<Long> requestIds);
 
+  /**
+   * Get a list of host role commands where the request id belongs to the input requestIds and
+   * the task id belongs to the input taskIds
+   */
   public List<HostRoleCommand> getTasksByRequestAndTaskIds(Collection<Long> requestIds, Collection<Long> taskIds);
 
+  /**
+   * Given a list of task ids, get all the host role commands
+   */
   public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds);
 
+  /**
+   * Get all stages that contain tasks with specified host role statuses
+   */
   public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses);
 
+  /**
+   * Get all requests
+   */
   public List<Long> getRequests();
 
+  /**
+   * Gets the host role command corresponding to the task id
+   */
   public HostRoleCommand getTask(long taskId);
 
+  /**
+   * Gets request id of request that are in the specified status
+   */
   public List<Long> getRequestsByStatus(RequestStatus status);
 
+  /**
+   * Gets request contexts associated with the list of request id
+   */
   public Map<Long, String> getRequestContext(List<Long> requestIds);
 
+  /**
+   * Gets the request context associated with the request id
+   */
   public String getRequestContext(long requestId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 75ebfef..d0cba5e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -17,16 +17,17 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
 import com.google.inject.name.Named;
+import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.Role;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
+import org.apache.ambari.server.orm.dao.ActionDefinitionDAO;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
@@ -44,15 +45,20 @@ import org.apache.ambari.server.state.Clusters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-import com.google.inject.persist.Transactional;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 @Singleton
 public class ActionDBAccessorImpl implements ActionDBAccessor {
   private static final Logger LOG = LoggerFactory.getLogger(ActionDBAccessorImpl.class);
-
+  private final long requestId;
   @Inject
   private ClusterDAO clusterDAO;
   @Inject
@@ -71,12 +77,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   private HostRoleCommandFactory hostRoleCommandFactory;
   @Inject
   private Clusters clusters;
-
   private Cache<Long, HostRoleCommand> hostRoleCommandCache;
   private long cacheLimit; //may be exceeded to store tasks from one request
 
-  private final long requestId;
-
   @Inject
   public ActionDBAccessorImpl(Injector injector, @Named("executionCommandCacheSize") long cacheLimit) {
     injector.injectMembers(this);
@@ -90,10 +93,10 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   }
 
   /* (non-Javadoc)
-   * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getAction(java.lang.String)
+   * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getStage(java.lang.String)
    */
   @Override
-  public Stage getAction(String actionId) {
+  public Stage getStage(String actionId) {
     return stageFactory.createExisting(actionId);
   }
 
@@ -137,7 +140,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   @Override
   @Transactional
   public void timeoutHostRole(String host, long requestId, long stageId,
-                              Role role) {
+                              String role) {
     List<HostRoleCommandEntity> commands =
         hostRoleCommandDAO.findByHostRole(host, requestId, stageId, role);
     for (HostRoleCommandEntity command : commands) {
@@ -170,6 +173,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Adding stages to DB, stageCount=" + stages.size());
     }
+
     for (Stage stage : stages) {
       StageEntity stageEntity = stage.constructNewPersistenceEntity();
       Cluster cluster;
@@ -213,7 +217,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
       for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
         roleSuccessCriteriaDAO.create(roleSuccessCriteriaEntity);
       }
-
     }
   }
 
@@ -227,7 +230,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
           + stageId + " role " + role + " report " + report);
     }
     List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(
-        hostname, requestId, stageId, Role.valueOf(role));
+        hostname, requestId, stageId, role);
     for (HostRoleCommandEntity command : commands) {
       command.setStatus(HostRoleStatus.valueOf(report.getStatus()));
       command.setStdOut(report.getStdOut().getBytes());
@@ -238,13 +241,13 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   }
 
   @Override
-  public void abortHostRole(String host, long requestId, long stageId, Role role) {
+  public void abortHostRole(String host, long requestId, long stageId, String role) {
     CommandReport report = new CommandReport();
     report.setExitCode(999);
     report.setStdErr("Host Role in invalid state");
     report.setStdOut("");
     report.setStatus("ABORTED");
-    updateHostRoleState(host, requestId, stageId, role.toString(), report);
+    updateHostRoleState(host, requestId, stageId, role, report);
   }
 
   @Override
@@ -266,7 +269,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     } else {
       throw new RuntimeException("HostRoleCommand is not persisted, cannot update:\n" + hostRoleCommand);
     }
-
   }
 
   @Override
@@ -275,11 +277,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     return getTasks(
         hostRoleCommandDAO.findTaskIdsByRequest(requestId)
     );
-
-//    for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequest(requestId)) {
-//      tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-//    }
-//    return tasks;
   }
 
   @Override
@@ -291,12 +288,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     return getTasks(
         hostRoleCommandDAO.findTaskIdsByRequestIds(requestIds)
     );
-
-//    List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
-//    for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestIds(requestIds)) {
-//      tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-//    }
-//    return tasks;
   }
 
   @Override
@@ -304,11 +295,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     if (!requestIds.isEmpty() && !taskIds.isEmpty()) {
       return getTasks(hostRoleCommandDAO.findTaskIdsByRequestAndTaskIds(requestIds, taskIds));
 
-//      List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
-//      for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestAndTaskIds(requestIds, taskIds)) {
-//        tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-//      }
-//      return tasks;
     } else if (requestIds.isEmpty()) {
       return getTasks(taskIds);
     } else if (taskIds.isEmpty()) {
@@ -335,7 +321,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
 
     if (!absent.isEmpty()) {
       boolean allowStore = hostRoleCommandCache.size() <= cacheLimit;
-//      LOG.info("Cache size {}, enable = {}", hostRoleCommandCache.size(), allowStore);
 
       for (HostRoleCommandEntity commandEntity : hostRoleCommandDAO.findByPKs(absent)) {
         HostRoleCommand hostRoleCommand = hostRoleCommandFactory.createExisting(commandEntity);

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
index 1fccf12..8c36366 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
@@ -17,38 +17,42 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-import java.util.*;
-
-import org.apache.ambari.server.Role;
+import com.google.inject.Singleton;
 import org.apache.ambari.server.agent.CommandReport;
-import org.apache.ambari.server.agent.ExecutionCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.inject.Singleton;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 @Singleton
 public class ActionDBInMemoryImpl implements ActionDBAccessor {
 
+  private static Logger LOG = LoggerFactory.getLogger(ActionDBInMemoryImpl.class);
   // for a persisted DB, this will be initialized in the ctor
   // with the highest persisted requestId value in the DB
   private final long lastRequestId = 0;
-  private static Logger LOG = LoggerFactory.getLogger(ActionDBInMemoryImpl.class);
   List<Stage> stageList = new ArrayList<Stage>();
 
   @Override
-  public synchronized Stage getAction(String actionId) {
-    for (Stage s: stageList) {
+  public synchronized Stage getStage(String actionId) {
+    for (Stage s : stageList) {
       if (s.getActionId().equals(actionId)) {
         return s;
       }
     }
     return null;
   }
+
   @Override
   public synchronized List<Stage> getAllStages(long requestId) {
     List<Stage> l = new ArrayList<Stage>();
-    for (Stage s: stageList) {
+    for (Stage s : stageList) {
       if (s.getRequestId() == requestId) {
         l.add(s);
       }
@@ -80,7 +84,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
 
   @Override
   public synchronized void timeoutHostRole(String host, long requestId,
-      long stageId, Role role) {
+                                           long stageId, String role) {
     for (Stage s : stageList) {
       s.setHostRoleStatus(host, role.toString(), HostRoleStatus.TIMEDOUT);
     }
@@ -89,7 +93,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
   @Override
   public synchronized List<Stage> getStagesInProgress() {
     List<Stage> l = new ArrayList<Stage>();
-    for (Stage s: stageList) {
+    for (Stage s : stageList) {
       if (s.isStageInProgress()) {
         l.add(s);
       }
@@ -99,15 +103,16 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
 
   @Override
   public synchronized void persistActions(List<Stage> stages) {
-    for (Stage s: stages) {
+    for (Stage s : stages) {
       stageList.add(s);
     }
   }
+
   @Override
   public synchronized void updateHostRoleState(String hostname, long requestId,
-      long stageId, String role, CommandReport report) {
-    LOG.info("DEBUG stages to iterate: "+stageList.size());
-    if(null == report.getStatus()
+                                               long stageId, String role, CommandReport report) {
+    LOG.info("DEBUG stages to iterate: " + stageList.size());
+    if (null == report.getStatus()
         || null == report.getStdOut()
         || null == report.getStdErr()) {
       throw new RuntimeException("Badly formed command report.");
@@ -124,13 +129,13 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
   }
 
   @Override
-  public void abortHostRole(String host, long requestId, long stageId, Role role) {
+  public void abortHostRole(String host, long requestId, long stageId, String role) {
     CommandReport report = new CommandReport();
     report.setExitCode(999);
     report.setStdErr("Host Role in invalid state");
     report.setStdOut("");
     report.setStatus("ABORTED");
-    updateHostRoleState(host, requestId, stageId, role.toString(), report);
+    updateHostRoleState(host, requestId, stageId, role, report);
   }
 
   @Override
@@ -168,17 +173,18 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
   @Override
   public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) {
     List<Stage> l = new ArrayList<Stage>();
-    for (Stage s: stageList) {
+    for (Stage s : stageList) {
       if (s.doesStageHaveHostRoleStatus(statuses)) {
         l.add(s);
       }
     }
     return l;
   }
+
   @Override
   public synchronized List<Long> getRequests() {
     Set<Long> requestIds = new HashSet<Long>();
-    for (Stage s: stageList) {
+    for (Stage s : stageList) {
       requestIds.add(s.getRequestId());
     }
     List<Long> ids = new ArrayList<Long>();
@@ -199,6 +205,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
     }
     return null;
   }
+
   @Override
   public List<Long> getRequestsByStatus(RequestStatus status) {
     // TODO
@@ -211,7 +218,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
     for (Long requestId : requestIds) {
       List<Stage> stages = getAllStages(requestId);
       result.put(requestId, stages != null && !stages.isEmpty() ? stages.get
-        (0).getRequestContext() : "");
+          (0).getRequestContext() : "");
     }
     return result;
   }
@@ -220,6 +227,6 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
   public String getRequestContext(long requestId) {
     List<Stage> stages = getAllStages(requestId);
     return stages != null && !stages.isEmpty() ? stages.get(0)
-      .getRequestContext() : "";
+        .getRequestContext() : "";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index 6b32e73..aa553c4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -24,6 +24,7 @@ import com.google.inject.persist.UnitOfWork;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.serveraction.ServerActionManager;
 import org.apache.ambari.server.state.Clusters;
@@ -73,12 +74,16 @@ public class ActionManager {
     scheduler.stop();
   }
 
-  public void sendActions(List<Stage> stages) {
+  public void sendActions(List<Stage> stages, ExecuteActionRequest request) {
 
     if (LOG.isDebugEnabled()) {
       for (Stage s : stages) {
         LOG.debug("Persisting stage into db: " + s.toString());
       }
+
+      if (request != null) {
+        LOG.debug("In response to request: " + request.toString());
+      }
     }
     db.persistActions(stages);
 
@@ -91,7 +96,7 @@ public class ActionManager {
   }
 
   public Stage getAction(long requestId, long stageId) {
-    return db.getAction(StageUtils.getActionId(requestId, stageId));
+    return db.getStage(StageUtils.getActionId(requestId, stageId));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 22ed44e..a5c5dc0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -205,8 +205,7 @@ class ActionScheduler implements Runnable {
               scheduleHostRole(s, cmd);
             } catch (InvalidStateTransitionException e) {
               LOG.warn("Could not schedule host role " + cmd.toString(), e);
-              db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(),
-                  Role.valueOf(cmd.getRole()));
+              db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(), cmd.getRole());
             }
           }
         }
@@ -326,8 +325,7 @@ class ActionScheduler implements Runnable {
           if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
             LOG.warn("Host:" + host + ", role:" + roleStr + ", actionId:"
                 + s.getActionId() + " expired");
-            db.timeoutHostRole(host, s.getRequestId(), s.getStageId(),
-                Role.valueOf(c.getRole()));
+            db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), c.getRole());
             //Reinitialize status
             status = s.getHostRoleStatus(host, roleStr);
             ServiceComponentHostOpFailedEvent timeoutEvent =

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
index eb0cfa9..38bc371 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
@@ -98,7 +98,8 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
                                      TargetHostType targetType, String serviceType, String componentType,
                                      Short defaultTimeout)
       throws AmbariException {
-    validateCreateInput(actionName, actionType, inputs, description, defaultTimeout);
+    validateCreateInput(actionName, actionType, inputs, description, defaultTimeout,
+        targetType, serviceType, componentType);
     ActionEntity entity =
         actionDefinitionDAO.findByPK(actionName);
     if (entity == null) {
@@ -178,7 +179,8 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
   }
 
   private void validateCreateInput(String actionName, ActionType actionType, String inputs,
-                                   String description, Short defaultTimeout)
+                                   String description, Short defaultTimeout,
+                                   TargetHostType targetType, String serviceType, String componentType)
       throws AmbariException {
 
     validateActionName(actionName);
@@ -199,6 +201,12 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
       throw new AmbariException("Action type cannot be " + actionType);
     }
 
+    if (serviceType == null || serviceType.isEmpty()) {
+      if (componentType != null && !componentType.isEmpty()) {
+        throw new AmbariException("Target component cannot be specified unless target service is specified");
+      }
+    }
+
     if (inputs != null && !inputs.isEmpty()) {
       String[] parameters = inputs.split(",");
       for (String parameter : parameters) {

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
index e3bed0c..21ec077 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
@@ -86,10 +86,6 @@ public class HostRoleCommand {
     //make use of lazy loading
 
     executionCommandDAO = injector.getInstance(ExecutionCommandDAO.class);
-//    executionCommandWrapper = new ExecutionCommandWrapper(new String(
-//        hostRoleCommandEntity
-//            .getExecutionCommand().getCommand()
-//    ));
   }
 
   HostRoleCommandEntity constructNewPersistenceEntity() {
@@ -106,10 +102,6 @@ public class HostRoleCommand {
     hostRoleCommandEntity.setRoleCommand(roleCommand);
 
     hostRoleCommandEntity.setEvent(event.getEventJson());
-//    ExecutionCommandEntity executionCommandEntity = new ExecutionCommandEntity();
-//    executionCommandEntity.setCommand(executionCommandWrapper.getJson().getBytes());
-//    executionCommandEntity.setHostRoleCommand(hostRoleCommandEntity);
-//    hostRoleCommandEntity.setExecutionCommand(executionCommandEntity);
 
     return hostRoleCommandEntity;
   }
@@ -120,7 +112,6 @@ public class HostRoleCommand {
     return executionCommandEntity;
   }
 
-
   public long getTaskId() {
     return taskId;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
index 1b4ecdf..c72c14b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
@@ -54,7 +54,8 @@ public class ExecutionCommand extends AgentCommand {
   private Map<String, Map<String, String>> configurationTags;
   private Map<String, String> commandParams;
   private String serviceName;
-  
+  private String componentName;
+
   @JsonProperty("commandId")
   public String getCommandId() {
     return this.commandId;
@@ -202,6 +203,16 @@ public class ExecutionCommand extends AgentCommand {
     this.serviceName = serviceName;
   }
 
+  @JsonProperty("componentName")
+  public String getComponentName() {
+    return componentName;
+  }
+
+  @JsonProperty("componentName")
+  public void setComponentName(String componentName) {
+    this.componentName = componentName;
+  }
+
   /**
    * @param configTags the config tag map
    */

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 60aede9..9da4d45 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -187,16 +187,19 @@ public class HeartBeatHandler {
     return response;
   }
 
-  protected void processCommandReports(HeartBeat heartbeat,
-                                       String hostname,
-                                       Clusters clusterFsm, long now)
+  protected void processCommandReports(
+      HeartBeat heartbeat, String hostname, Clusters clusterFsm, long now)
       throws AmbariException {
     List<CommandReport> reports = heartbeat.getReports();
     for (CommandReport report : reports) {
       LOG.debug("Received command report: " + report);
+      if (RoleCommand.ACTIONEXECUTE.equals(report.getRoleCommand())) {
+        continue;
+      }
+
       Cluster cl = clusterFsm.getCluster(report.getClusterName());
       String service = report.getServiceName();
-      if (service == null || "".equals(service)) {
+      if (service == null || service.isEmpty()) {
         throw new AmbariException("Invalid command report, service: " + service);
       }
       if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
@@ -217,7 +220,7 @@ public class HeartBeatHandler {
                 && null != report.getConfigurationTags()
                 && !report.getConfigurationTags().isEmpty()) {
               LOG.info("Updating applied config on service " + scHost.getServiceName() +
-              ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
+                  ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
               scHost.updateActualConfigs(report.getConfigurationTags());
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
new file mode 100644
index 0000000..f1bea70
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.actionmanager.TargetHostType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The context required to create tasks and stages for a custom action
+ */
+public class ActionExecutionContext {
+  private final String clusterName;
+  private final String actionName;
+  private final String serviceName;
+  private final String componentName;
+  private final List<String> hosts;
+  private final Map<String, String> parameters;
+  private final TargetHostType targetType;
+  private final Short timeout;
+
+  /**
+   * Create an ActionExecutionContext to execute an action from a request
+   */
+  public ActionExecutionContext(String clusterName, String actionName, String serviceName,
+                                String componentName, List<String> hosts, Map<String, String> parameters,
+                                TargetHostType targetType, Short timeout) {
+    this.clusterName = clusterName;
+    this.actionName = actionName;
+    this.serviceName = serviceName;
+    this.componentName = componentName;
+    this.parameters = parameters;
+    this.hosts = new ArrayList<String>();
+    if (hosts != null) {
+      this.hosts.addAll(hosts);
+    }
+    this.targetType = targetType;
+    this.timeout = timeout;
+  }
+
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public String getActionName() {
+    return actionName;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public String getComponentName() {
+    return componentName;
+  }
+
+  public Map<String, String> getParameters() {
+    return parameters;
+  }
+
+  public List<String> getHosts() {
+    return hosts;
+  }
+
+  public TargetHostType getTargetType() {
+    return targetType;
+  }
+
+  public Short getTimeout() {
+    return timeout;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
new file mode 100644
index 0000000..632b11d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.StackAccessException;
+import org.apache.ambari.server.actionmanager.ActionDefinition;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.TargetHostType;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.ComponentInfo;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceInfo;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Helper class containing logic to process custom action execution requests
+ */
+public class AmbariActionExecutionHelper {
+  private final static Logger LOG =
+      LoggerFactory.getLogger(AmbariCustomCommandExecutionHelper.class);
+  private ActionMetadata actionMetadata;
+  private Clusters clusters;
+  private AmbariManagementControllerImpl amcImpl;
+  private ActionManager actionManager;
+  private AmbariMetaInfo ambariMetaInfo;
+
+  public AmbariActionExecutionHelper(ActionMetadata actionMetadata, Clusters clusters,
+                                     AmbariManagementControllerImpl amcImpl) {
+    this.amcImpl = amcImpl;
+    this.actionMetadata = actionMetadata;
+    this.clusters = clusters;
+    this.actionManager = amcImpl.getActionManager();
+    this.ambariMetaInfo = amcImpl.getAmbariMetaInfo();
+  }
+
+  /**
+   * Validates the request to execute an action
+   *
+   * @param actionRequest
+   * @param cluster
+   * @return
+   * @throws AmbariException
+   */
+  public ActionExecutionContext validateCustomAction(ExecuteActionRequest actionRequest, Cluster cluster)
+      throws AmbariException {
+    if (actionRequest.getActionName() == null || actionRequest.getActionName().isEmpty()) {
+      throw new AmbariException("Action name must be specified");
+    }
+
+    ActionDefinition actionDef = actionManager.getActionDefinition(actionRequest.getActionName());
+    if (actionDef == null) {
+      throw new AmbariException("Action " + actionRequest.getActionName() + " does not exist");
+    }
+
+    StackId stackId = cluster.getCurrentStackVersion();
+    String expectedService = actionDef.getTargetService() == null ? "" : actionDef.getTargetService();
+    String actualService = actionRequest.getServiceName() == null ? "" : actionRequest.getServiceName();
+    if (!expectedService.isEmpty() && !actualService.isEmpty() && !expectedService.equals(actualService)) {
+      throw new AmbariException("Action " + actionRequest.getActionName() + " targets service " + actualService +
+          " that does not match with expected " + expectedService);
+    }
+
+    String targetService = expectedService;
+    if (targetService == null || targetService.isEmpty()) {
+      targetService = actualService;
+    }
+
+    if (targetService != null && !targetService.isEmpty()) {
+      ServiceInfo serviceInfo;
+      try {
+        serviceInfo = ambariMetaInfo.getService(stackId.getStackName(), stackId.getStackVersion(),
+            targetService);
+      } catch (StackAccessException se) {
+        serviceInfo = null;
+      }
+
+      if (serviceInfo == null) {
+        throw new AmbariException("Action " + actionRequest.getActionName() + " targets service " + targetService +
+            " that does not exist.");
+      }
+    }
+
+    String expectedComponent = actionDef.getTargetComponent() == null ? "" : actionDef.getTargetComponent();
+    String actualComponent = actionRequest.getComponentName() == null ? "" : actionRequest.getComponentName();
+    if (!expectedComponent.isEmpty() && !actualComponent.isEmpty() && !expectedComponent.equals(actualComponent)) {
+      throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + actualComponent +
+          " that does not match with expected " + expectedComponent);
+    }
+
+    String targetComponent = expectedComponent;
+    if (targetComponent == null || targetComponent.isEmpty()) {
+      targetComponent = actualComponent;
+    }
+
+    if (!targetComponent.isEmpty() && targetService.isEmpty()) {
+      throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + targetComponent +
+          " without specifying the target service.");
+    }
+
+    if (targetComponent != null && !targetComponent.isEmpty()) {
+      ComponentInfo compInfo;
+      try {
+        compInfo = ambariMetaInfo.getComponent(stackId.getStackName(), stackId.getStackVersion(),
+            targetService, targetComponent);
+      } catch (StackAccessException se) {
+        compInfo = null;
+      }
+
+      if (compInfo == null) {
+        throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + targetComponent +
+            " that does not exist.");
+      }
+    }
+
+    if (actionDef.getInputs() != null) {
+      String[] inputs = actionDef.getInputs().split(",");
+      for (String input : inputs) {
+        if (!input.trim().isEmpty() && !actionRequest.getParameters().containsKey(input.trim())) {
+          throw new AmbariException("Action " + actionRequest.getActionName() + " requires input '" +
+              input.trim() + "' that is not provided.");
+        }
+      }
+    }
+
+    if (actionDef.getTargetType() == TargetHostType.SPECIFIC
+        || (targetService.isEmpty() && targetService.isEmpty())) {
+      if (actionRequest.getHosts().size() == 0) {
+        throw new AmbariException("Action " + actionRequest.getActionName() + " requires explicit target host(s)" +
+            " that is not provided.");
+      }
+    }
+
+    LOG.info("Received action execution request"
+        + ", clusterName=" + actionRequest.getClusterName()
+        + ", request=" + actionRequest.toString());
+
+    ActionExecutionContext actionExecutionContext = new ActionExecutionContext(
+        actionRequest.getClusterName(), actionRequest.getActionName(), targetService, targetComponent,
+        actionRequest.getHosts(), actionRequest.getParameters(), actionDef.getTargetType(),
+        actionDef.getDefaultTimeout());
+
+    return actionExecutionContext;
+  }
+
+  /**
+   * Add tasks to the stage based on the requested action execution
+   *
+   * @param actionContext   the context associated with the action
+   * @param stage           stage into which tasks must be inserted
+   * @param configuration
+   * @param hostsMap
+   * @param hostLevelParams
+   * @throws AmbariException
+   */
+  public void addAction(ActionExecutionContext actionContext, Stage stage,
+                        Configuration configuration, HostsMap hostsMap, Map<String, String> hostLevelParams)
+      throws AmbariException {
+    String actionName = actionContext.getActionName();
+    String clusterName = actionContext.getClusterName();
+    String serviceName = actionContext.getServiceName();
+    String componentName = actionContext.getComponentName();
+
+    // List of host to select from
+    Set<String> candidateHosts = new HashSet<String>();
+    if (!serviceName.isEmpty()) {
+      if (!componentName.isEmpty()) {
+        Map<String, ServiceComponentHost> componentHosts =
+            clusters.getCluster(clusterName).getService(serviceName)
+                .getServiceComponent(componentName).getServiceComponentHosts();
+        candidateHosts.addAll(componentHosts.keySet());
+      } else {
+        for (String component : clusters.getCluster(clusterName).getService(serviceName)
+            .getServiceComponents().keySet()) {
+          Map<String, ServiceComponentHost> componentHosts =
+              clusters.getCluster(clusterName).getService(serviceName)
+                  .getServiceComponent(component).getServiceComponentHosts();
+          candidateHosts.addAll(componentHosts.keySet());
+        }
+      }
+    } else {
+      // All hosts are valid target host
+      candidateHosts.addAll(amcImpl.getClusters().getHostsForCluster(clusterName).keySet());
+    }
+
+    // If request did not specify hosts and there exists no host
+    if (actionContext.getHosts().isEmpty() && candidateHosts.isEmpty()) {
+      throw new AmbariException("Suitable hosts not found, component="
+          + componentName + ", service=" + serviceName
+          + ", cluster=" + clusterName + ", actionName=" + actionName);
+    }
+
+    // Compare specified hosts to available hosts
+    if (!actionContext.getHosts().isEmpty() && !candidateHosts.isEmpty()) {
+      for (String hostname : actionContext.getHosts()) {
+        if (!candidateHosts.contains(hostname)) {
+          throw new AmbariException("Request specifies host " + hostname + " but its not a valid host based on the " +
+              "target service=" + serviceName + " and component=" + componentName);
+        }
+      }
+    }
+
+    //Find target hosts to execute
+    if (actionContext.getHosts().isEmpty()) {
+      TargetHostType hostType = actionContext.getTargetType();
+      switch (hostType) {
+        case ALL:
+          actionContext.getHosts().addAll(candidateHosts);
+          break;
+        case ANY:
+          actionContext.getHosts().add(amcImpl.getHealthyHost(candidateHosts));
+          break;
+        case MAJORITY:
+          for (int i = 0; i < (candidateHosts.size() / 2) + 1; i++) {
+            String hostname = amcImpl.getHealthyHost(candidateHosts);
+            actionContext.getHosts().add(hostname);
+            candidateHosts.remove(hostname);
+          }
+          break;
+        default:
+          throw new AmbariException("Unsupported target type=" + hostType);
+      }
+    }
+
+    //create tasks for each host
+    for (String hostName : actionContext.getHosts()) {
+      stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionContext.getActionName()), RoleCommand.ACTIONEXECUTE,
+          new ServiceComponentHostOpInProgressEvent(actionContext.getActionName(), hostName,
+              System.currentTimeMillis()), clusterName, actionContext.getServiceName());
+
+      stage.getExecutionCommandWrapper(hostName, actionContext.getActionName()).getExecutionCommand()
+          .setRoleParams(actionContext.getParameters());
+
+      Cluster cluster = clusters.getCluster(clusterName);
+
+      Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
+      Map<String, Map<String, String>> configTags = null;
+      if (!actionContext.getServiceName().isEmpty()) {
+        configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, hostName);
+      }
+
+      ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
+          actionContext.getActionName()).getExecutionCommand();
+
+      execCmd.setConfigurations(configurations);
+      execCmd.setConfigurationTags(configTags);
+      execCmd.setHostLevelParams(hostLevelParams);
+      execCmd.setCommandParams(actionContext.getParameters());
+      execCmd.setServiceName(serviceName);
+      execCmd.setComponentName(componentName);
+
+      // Generate cluster host info
+      execCmd.setClusterHostInfo(
+          StageUtils.getClusterHostInfo(clusters.getHostsForCluster(clusterName), cluster, hostsMap, configuration));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
new file mode 100644
index 0000000..fa7522b
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Helper class containing logic to process custom command execution requests
+ */
+public class AmbariCustomCommandExecutionHelper {
+  private final static Logger LOG =
+      LoggerFactory.getLogger(AmbariCustomCommandExecutionHelper.class);
+  private ActionMetadata actionMetadata;
+  private Clusters clusters;
+  private AmbariManagementControllerImpl amcImpl;
+
+  public AmbariCustomCommandExecutionHelper(ActionMetadata actionMetadata, Clusters clusters,
+                                            AmbariManagementControllerImpl amcImpl) {
+    this.amcImpl = amcImpl;
+    this.actionMetadata = actionMetadata;
+    this.clusters = clusters;
+  }
+
+  public void validateCustomCommand(ExecuteActionRequest actionRequest) throws AmbariException {
+    if (actionRequest.getServiceName() == null
+        || actionRequest.getServiceName().isEmpty()
+        || actionRequest.getCommandName() == null
+        || actionRequest.getCommandName().isEmpty()) {
+      throw new AmbariException("Invalid request : " + "cluster="
+          + actionRequest.getClusterName() + ", service="
+          + actionRequest.getServiceName() + ", command="
+          + actionRequest.getCommandName());
+    }
+
+    LOG.info("Received a command execution request"
+        + ", clusterName=" + actionRequest.getClusterName()
+        + ", serviceName=" + actionRequest.getServiceName()
+        + ", request=" + actionRequest.toString());
+
+    if (!isValidCommand(actionRequest.getCommandName(), actionRequest.getServiceName())) {
+      throw new AmbariException(
+          "Unsupported action " + actionRequest.getCommandName() + " for " + actionRequest.getServiceName());
+    }
+  }
+
+  private Boolean isValidCommand(String command, String service) {
+    List<String> actions = actionMetadata.getActions(service);
+    if (actions == null || actions.size() == 0) {
+      return false;
+    }
+
+    if (!actions.contains(command)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  public void addAction(ExecuteActionRequest actionRequest, Stage stage,
+                        Configuration configuration, HostsMap hostsMap, Map<String, String> hostLevelParams)
+      throws AmbariException {
+    if (actionRequest.getCommandName().contains("SERVICE_CHECK")) {
+      addServiceCheckAction(actionRequest, stage, configuration, hostsMap, hostLevelParams);
+    } else if (actionRequest.getCommandName().equals("DECOMMISSION_DATANODE")) {
+      addDecommissionDatanodeAction(actionRequest, stage, hostLevelParams);
+    } else {
+      throw new AmbariException("Unsupported action " + actionRequest.getCommandName());
+    }
+  }
+
+  private void addServiceCheckAction(ExecuteActionRequest actionRequest, Stage stage,
+                                     Configuration configuration, HostsMap hostsMap,
+                                     Map<String, String> hostLevelParams)
+      throws AmbariException {
+    String clusterName = actionRequest.getClusterName();
+    String componentName = actionMetadata.getClient(actionRequest
+        .getServiceName());
+
+    String hostName;
+    if (componentName != null) {
+      Map<String, ServiceComponentHost> components = clusters
+          .getCluster(clusterName).getService(actionRequest.getServiceName())
+          .getServiceComponent(componentName).getServiceComponentHosts();
+
+      if (components.isEmpty()) {
+        throw new AmbariException("Hosts not found, component="
+            + componentName + ", service=" + actionRequest.getServiceName()
+            + ", cluster=" + clusterName);
+      }
+      hostName = amcImpl.getHealthyHost(components.keySet());
+    } else {
+      Map<String, ServiceComponent> components = clusters
+          .getCluster(clusterName).getService(actionRequest.getServiceName())
+          .getServiceComponents();
+
+      if (components.isEmpty()) {
+        throw new AmbariException("Components not found, service="
+            + actionRequest.getServiceName() + ", cluster=" + clusterName);
+      }
+
+      ServiceComponent serviceComponent = components.values().iterator()
+          .next();
+
+      if (serviceComponent.getServiceComponentHosts().isEmpty()) {
+        throw new AmbariException("Hosts not found, component="
+            + serviceComponent.getName() + ", service="
+            + actionRequest.getServiceName() + ", cluster=" + clusterName);
+      }
+
+      hostName = serviceComponent.getServiceComponentHosts().keySet()
+          .iterator().next();
+    }
+
+    stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionRequest
+        .getCommandName()), RoleCommand.EXECUTE,
+        new ServiceComponentHostOpInProgressEvent(componentName, hostName,
+            System.currentTimeMillis()), clusterName, actionRequest
+        .getServiceName());
+
+    stage.getExecutionCommandWrapper(hostName, actionRequest.getCommandName()).getExecutionCommand()
+        .setRoleParams(actionRequest.getParameters());
+
+    Cluster cluster = clusters.getCluster(clusterName);
+
+    // [ type -> [ key, value ] ]
+    Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
+    Map<String, Map<String, String>> configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, hostName);
+
+    ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
+        actionRequest.getCommandName()).getExecutionCommand();
+
+    execCmd.setConfigurations(configurations);
+    execCmd.setConfigurationTags(configTags);
+    execCmd.setHostLevelParams(hostLevelParams);
+
+    // Generate cluster host info
+    execCmd.setClusterHostInfo(
+        StageUtils.getClusterHostInfo(clusters.getHostsForCluster(clusterName), cluster, hostsMap, configuration));
+  }
+
+  private void addDecommissionDatanodeAction(ExecuteActionRequest decommissionRequest, Stage stage,
+                                             Map<String, String> hostLevelParams)
+      throws AmbariException {
+    String hdfsExcludeFileType = "hdfs-exclude-file";
+    // Find hdfs admin host, just decommission from namenode.
+    String clusterName = decommissionRequest.getClusterName();
+    Cluster cluster = clusters.getCluster(clusterName);
+    String serviceName = decommissionRequest.getServiceName();
+    String namenodeHost = clusters.getCluster(clusterName)
+        .getService(serviceName).getServiceComponent(Role.NAMENODE.toString())
+        .getServiceComponentHosts().keySet().iterator().next();
+
+    String excludeFileTag = null;
+    if (decommissionRequest.getParameters() != null
+        && (decommissionRequest.getParameters().get("excludeFileTag") != null)) {
+      excludeFileTag = decommissionRequest.getParameters()
+          .get("excludeFileTag");
+    }
+
+    if (excludeFileTag == null) {
+      throw new AmbariException("No exclude file specified"
+          + " when decommissioning datanodes. Provide parameter excludeFileTag with the tag for config type "
+          + hdfsExcludeFileType);
+    }
+
+    Config config = clusters.getCluster(clusterName).getConfig(
+        hdfsExcludeFileType, excludeFileTag);
+    if (config == null) {
+      throw new AmbariException("Decommissioning datanodes requires the cluster to be associated with config type " +
+          hdfsExcludeFileType + " with a list of datanodes to be decommissioned (\"datanodes\" : list).");
+    }
+
+    LOG.info("Decommissioning data nodes: " + config.getProperties().get("datanodes") +
+        " " + hdfsExcludeFileType + " tag: " + excludeFileTag);
+
+    Map<String, Map<String, String>> configurations =
+        new TreeMap<String, Map<String, String>>();
+
+
+    Map<String, Map<String, String>> configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, namenodeHost);
+
+    // Add the tag for hdfs-exclude-file
+    Map<String, String> excludeTags = new HashMap<String, String>();
+    excludeTags.put(ConfigHelper.CLUSTER_DEFAULT_TAG, config.getVersionTag());
+    configTags.put(hdfsExcludeFileType, excludeTags);
+
+    stage.addHostRoleExecutionCommand(
+        namenodeHost,
+        Role.DECOMMISSION_DATANODE,
+        RoleCommand.EXECUTE,
+        new ServiceComponentHostOpInProgressEvent(Role.DECOMMISSION_DATANODE
+            .toString(), namenodeHost, System.currentTimeMillis()),
+        clusterName, serviceName);
+
+    ExecutionCommand execCmd = stage.getExecutionCommandWrapper(namenodeHost,
+        Role.DECOMMISSION_DATANODE.toString()).getExecutionCommand();
+
+    execCmd.setConfigurations(configurations);
+    execCmd.setConfigurationTags(configTags);
+    execCmd.setHostLevelParams(hostLevelParams);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index ef5c372..890cb81 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -165,7 +165,10 @@ public class AmbariManagementControllerImpl implements
   final private String ojdbcUrl;
   final private String serverDB;
   final private String mysqljdbcUrl;
-  
+
+  final private AmbariCustomCommandExecutionHelper customCommandExecutionHelper;
+  final private AmbariActionExecutionHelper actionExecutionHelper;
+
   @Inject
   public AmbariManagementControllerImpl(ActionManager actionManager,
       Clusters clusters, Injector injector) throws Exception {
@@ -200,6 +203,11 @@ public class AmbariManagementControllerImpl implements
       this.mysqljdbcUrl = null;
       this.serverDB = null;
     }
+
+    this.customCommandExecutionHelper = new AmbariCustomCommandExecutionHelper(
+        this.actionMetadata, this.clusters, this);
+    this.actionExecutionHelper = new AmbariActionExecutionHelper(
+        this.actionMetadata, this.clusters, this);
   }
   
   public String getAmbariServerURI(String path) {
@@ -1090,7 +1098,7 @@ public class AmbariManagementControllerImpl implements
    * @return
    * @throws AmbariException
    */
-  private Map<String, Map<String,String>> findConfigurationTagsWithOverrides(
+  protected Map<String, Map<String,String>> findConfigurationTagsWithOverrides(
     Cluster cluster, String hostName) throws AmbariException {
 
     Map<String, Map<String,String>> configTags =
@@ -1099,7 +1107,6 @@ public class AmbariManagementControllerImpl implements
     return configTags;
   }
 
-
   private List<Stage> doStageCreation(Cluster cluster,
       Map<State, List<Service>> changedServices,
       Map<State, List<ServiceComponent>> changedComps,
@@ -1142,7 +1149,7 @@ public class AmbariManagementControllerImpl implements
       // multiple stages may be needed for reconfigure
       long stageId = 0;
       Map<String, List<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
-          clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector);
+          clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector.getInstance(Configuration.class));
       
       
       String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);
@@ -1353,7 +1360,8 @@ public class AmbariManagementControllerImpl implements
         stage.getExecutionCommandWrapper(clientHost, smokeTestRole)
             .getExecutionCommand()
             .setClusterHostInfo(StageUtils.getClusterHostInfo(
-                clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector));
+                clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap,
+                injector.getInstance(Configuration.class)));
 
         Map<String,String> hostParams = new HashMap<String, String>();
         hostParams.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
@@ -1381,7 +1389,7 @@ public class AmbariManagementControllerImpl implements
             + ", requestId=" + stages.get(0).getRequestId()
             + ", stagesCount=" + stages.size());
       }
-      actionManager.sendActions(stages);
+      actionManager.sendActions(stages, null);
     }
   }
 
@@ -1472,7 +1480,6 @@ public class AmbariManagementControllerImpl implements
     Map<String, Map<String, Map<String, Set<String>>>> hostComponentNames =
         new HashMap<String, Map<String, Map<String, Set<String>>>>();
     Set<State> seenNewStates = new HashSet<State>();
-    boolean processingUpgradeRequest = false;
     int numberOfRequestsProcessed = 0;
     StackId fromStackVersion = new StackId();
     Map<ServiceComponentHost, State> directTransitionScHosts = new HashMap<ServiceComponentHost, State>();
@@ -1541,9 +1548,6 @@ public class AmbariManagementControllerImpl implements
         }
       }
 
-      // If upgrade request comes without state information then its an error
-      boolean upgradeRequest = checkIfUpgradeRequestAndValidate(request, cluster, s, sc, sch);
-
       if (newState == null) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Nothing to do for new updateServiceComponentHost request"
@@ -1565,25 +1569,6 @@ public class AmbariManagementControllerImpl implements
 
       seenNewStates.add(newState);
 
-      if (!processingUpgradeRequest && upgradeRequest) {
-        processingUpgradeRequest = true;
-        // this needs to be the first request
-        if (numberOfRequestsProcessed > 1) {
-          throw new AmbariException("An upgrade request cannot be combined with " +
-              "other non-upgrade requests.");
-        }
-        fromStackVersion = sch.getStackVersion();
-      }
-
-      if (processingUpgradeRequest) {
-        if (!upgradeRequest) {
-          throw new AmbariException("An upgrade request cannot be combined with " +
-              "other non-upgrade requests.");
-        }
-        sch.setState(State.UPGRADING);
-        sch.setDesiredStackVersion(cluster.getCurrentStackVersion());
-      }
-
       State oldSchState = sch.getState();
       // Client component reinstall allowed
       if (newState == oldSchState && !sc.isClientComponent()) {
@@ -1613,20 +1598,6 @@ public class AmbariManagementControllerImpl implements
       }
 
       if (isDirectTransition(oldSchState, newState)) {
-
-//        if (newState == State.DELETED) {
-//          if (!sch.canBeRemoved()) {
-//            throw new AmbariException("Servicecomponenthost cannot be removed"
-//                + ", clusterName=" + cluster.getClusterName()
-//                + ", clusterId=" + cluster.getClusterId()
-//                + ", serviceName=" + sch.getServiceName()
-//                + ", componentName=" + sch.getServiceComponentName()
-//                + ", hostname=" + sch.getHostName()
-//                + ", currentState=" + oldSchState
-//                + ", newDesiredState=" + newState);
-//          }
-//        }
-
         if (LOG.isDebugEnabled()) {
           LOG.debug("Handling direct transition update to ServiceComponentHost"
               + ", clusterName=" + request.getClusterName()
@@ -1698,13 +1669,8 @@ public class AmbariManagementControllerImpl implements
 
     Cluster cluster = clusters.getCluster(clusterNames.iterator().next());
 
-    Map<String, String> requestParameters = null;
-    if (processingUpgradeRequest) {
-      requestParameters = new HashMap<String, String>();
-      requestParameters.put(Configuration.UPGRADE_TO_STACK, gson.toJson(cluster.getCurrentStackVersion()));
-      requestParameters.put(Configuration.UPGRADE_FROM_STACK, gson.toJson(fromStackVersion));
-    }
-    return createStages(cluster, requestProperties, requestParameters, null, null, changedScHosts, ignoredScHosts, runSmokeTest, false);
+    return createStages(cluster, requestProperties, null, null, null, changedScHosts, ignoredScHosts, runSmokeTest,
+        false);
   }
 
   private void validateServiceComponentHostRequest(ServiceComponentHostRequest request) {
@@ -2187,7 +2153,7 @@ public class AmbariManagementControllerImpl implements
     return null;
   }
 
-  private String getHealthyHost(Set<String> hostList) throws AmbariException {
+  protected String getHealthyHost(Set<String> hostList) throws AmbariException {
     // Return a healthy host if found otherwise any random host
     String hostName = null;
     for (String candidateHostName : hostList) {
@@ -2200,149 +2166,11 @@ public class AmbariManagementControllerImpl implements
     return hostName;
   }
 
-  private void addServiceCheckAction(ExecuteActionRequest actionRequest, Stage stage)
-      throws AmbariException {
-    String clusterName = actionRequest.getClusterName();
-    String componentName = actionMetadata.getClient(actionRequest
-        .getServiceName());
-
-    String hostName;
-    if (componentName != null) {
-      Map<String, ServiceComponentHost> components = clusters
-          .getCluster(clusterName).getService(actionRequest.getServiceName())
-          .getServiceComponent(componentName).getServiceComponentHosts();
-
-      if (components.isEmpty()) {
-        throw new AmbariException("Hosts not found, component="
-            + componentName + ", service=" + actionRequest.getServiceName()
-            + ", cluster=" + clusterName);
-      }
-      hostName = getHealthyHost(components.keySet());
-    } else {
-      Map<String, ServiceComponent> components = clusters
-          .getCluster(clusterName).getService(actionRequest.getServiceName())
-          .getServiceComponents();
-
-      if (components.isEmpty()) {
-        throw new AmbariException("Components not found, service="
-            + actionRequest.getServiceName() + ", cluster=" + clusterName);
-      }
-
-      ServiceComponent serviceComponent = components.values().iterator()
-          .next();
-
-      if (serviceComponent.getServiceComponentHosts().isEmpty()) {
-        throw new AmbariException("Hosts not found, component="
-            + serviceComponent.getName() + ", service="
-            + actionRequest.getServiceName() + ", cluster=" + clusterName);
-      }
-
-      hostName = serviceComponent.getServiceComponentHosts().keySet()
-          .iterator().next();
-    }
-
-    stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionRequest
-        .getCommandName()), RoleCommand.EXECUTE,
-        new ServiceComponentHostOpInProgressEvent(componentName, hostName,
-            System.currentTimeMillis()), clusterName, actionRequest
-            .getServiceName());
-
-    stage.getExecutionCommandWrapper(hostName, actionRequest.getCommandName()).getExecutionCommand()
-        .setRoleParams(actionRequest.getParameters());
-
-    Cluster cluster = clusters.getCluster(clusterName);
-    
-    // [ type -> [ key, value ] ]
-    Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String,String>>();
-    Map<String, Map<String, String>> configTags =
-      findConfigurationTagsWithOverrides(cluster, hostName);
-
-    ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
-      actionRequest.getCommandName()).getExecutionCommand();
-
-    execCmd.setConfigurations(configurations);
-    execCmd.setConfigurationTags(configTags);
-
-    Map<String, String> params = new TreeMap<String, String>();
-    params.put("jdk_location", this.jdkResourceUrl);
-    params.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
-    execCmd.setHostLevelParams(params);
-  }
-
-  private void addDecommissionDatanodeAction(
-      ExecuteActionRequest decommissionRequest, Stage stage)
-      throws AmbariException {
-    String hdfsExcludeFileType = "hdfs-exclude-file";
-    // Find hdfs admin host, just decommission from namenode.
-    String clusterName = decommissionRequest.getClusterName();
-    Cluster cluster = clusters.getCluster(clusterName);
-    String serviceName = decommissionRequest.getServiceName();
-    String namenodeHost = clusters.getCluster(clusterName)
-        .getService(serviceName).getServiceComponent(Role.NAMENODE.toString())
-        .getServiceComponentHosts().keySet().iterator().next();
-
-    String excludeFileTag = null;
-    if (decommissionRequest.getParameters() != null
-        && (decommissionRequest.getParameters().get("excludeFileTag") != null)) {
-      excludeFileTag = decommissionRequest.getParameters()
-          .get("excludeFileTag");
-    }
-
-    if (excludeFileTag == null) {
-      throw new IllegalArgumentException("No exclude file specified"
-          + " when decommissioning datanodes. Provide parameter excludeFileTag with the tag for config type "
-          + hdfsExcludeFileType);
-    }
-
-    Config config = clusters.getCluster(clusterName).getConfig(
-        hdfsExcludeFileType, excludeFileTag);
-    if(config == null){
-      throw new AmbariException("Decommissioning datanodes requires the cluster to be associated with config type " +
-      hdfsExcludeFileType + " with a list of datanodes to be decommissioned (\"datanodes\" : list).");
-    }
-
-    LOG.info("Decommissioning data nodes: " + config.getProperties().get("datanodes") +
-        " " + hdfsExcludeFileType + " tag: " + excludeFileTag);
-
-    Map<String, Map<String, String>> configurations =
-        new TreeMap<String, Map<String, String>>();
-
-    
-    Map<String, Map<String, String>> configTags =
-      findConfigurationTagsWithOverrides(cluster, namenodeHost);
-    
-    // Add the tag for hdfs-exclude-file
-    Map<String, String> excludeTags = new HashMap<String, String>();
-    excludeTags.put(ConfigHelper.CLUSTER_DEFAULT_TAG, config.getVersionTag());
-    configTags.put(hdfsExcludeFileType, excludeTags);
-
-    stage.addHostRoleExecutionCommand(
-        namenodeHost,
-        Role.DECOMMISSION_DATANODE,
-        RoleCommand.EXECUTE,
-        new ServiceComponentHostOpInProgressEvent(Role.DECOMMISSION_DATANODE
-            .toString(), namenodeHost, System.currentTimeMillis()),
-        clusterName, serviceName);
-
-    ExecutionCommand execCmd = stage.getExecutionCommandWrapper(namenodeHost,
-        Role.DECOMMISSION_DATANODE.toString()).getExecutionCommand();
-
-    execCmd.setConfigurations(configurations);
-    execCmd.setConfigurationTags(configTags);
-
-    Map<String, String> params = new TreeMap<String, String>();
-    params.put("jdk_location", this.jdkResourceUrl);
-    params.put("stack_version", cluster.getDesiredStackVersion()
-        .getStackVersion());
-    execCmd.setHostLevelParams(params);
-
-  }
-
   @Override
   public RequestStatusResponse createAction(ExecuteActionRequest actionRequest, Map<String, String> requestProperties)
       throws AmbariException {
-    String clusterName = null;
-
+    String clusterName;
+    Configuration configuration = injector.getInstance(Configuration.class);
     String requestContext = "";
 
     if (requestProperties != null) {
@@ -2353,48 +2181,38 @@ public class AmbariManagementControllerImpl implements
       }
     }
 
-    String logDir = ""; //TODO empty for now
-
     if (actionRequest.getClusterName() == null
-        || actionRequest.getClusterName().isEmpty()
-        || actionRequest.getServiceName() == null
-        || actionRequest.getServiceName().isEmpty()
-        || actionRequest.getCommandName() == null
-        || actionRequest.getCommandName().isEmpty()) {
-      throw new AmbariException("Invalid action request : " + "cluster="
-          + actionRequest.getClusterName() + ", service="
-          + actionRequest.getServiceName() + ", command="
-          + actionRequest.getCommandName());
+        || actionRequest.getClusterName().isEmpty()) {
+      throw new AmbariException("Invalid request, cluster name must be specified");
     }
-
     clusterName = actionRequest.getClusterName();
-    
+
     Cluster cluster = clusters.getCluster(clusterName);
-    
-    Map<String, List<String>> clusterHostInfoMap = StageUtils.getClusterHostInfo(clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector);
 
-    String clusterHostInfo = StageUtils.getGson().toJson(clusterHostInfoMap);
-    
-    Stage stage = stageFactory.createNew(actionManager.getNextRequestId(),
-        logDir, clusterName, requestContext, clusterHostInfo);
+    ActionExecutionContext actionExecContext = null;
+    if (actionRequest.isCommand()) {
+      customCommandExecutionHelper.validateCustomCommand(actionRequest);
+    } else {
+      actionExecContext = actionExecutionHelper.validateCustomAction(actionRequest, cluster);
+    }
+
+    Map<String, List<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
+        clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap,
+        injector.getInstance(Configuration.class));
+
+    String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);
+    Stage stage = createNewStage(cluster, actionManager.getNextRequestId(), requestContext, clusterHostInfoJson);
 
     stage.setStageId(0);
-    LOG.info("Received a createAction request"
-        + ", clusterName=" + actionRequest.getClusterName()
-        + ", serviceName=" + actionRequest.getServiceName()
-        + ", request=" + actionRequest.toString());
 
-    if (!isValidCommand(actionRequest.getCommandName(), actionRequest.getServiceName())) {
-      throw new AmbariException(
-          "Unsupported action " + actionRequest.getCommandName() + " for " + actionRequest.getServiceName());
-    }
+    Map<String, String> params = new TreeMap<String, String>();
+    params.put("jdk_location", this.jdkResourceUrl);
+    params.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
 
-    if (actionRequest.getCommandName().contains("SERVICE_CHECK")) {
-      addServiceCheckAction(actionRequest, stage);
-    } else if (actionRequest.getCommandName().equals("DECOMMISSION_DATANODE")) {
-      addDecommissionDatanodeAction(actionRequest, stage);
+    if (actionRequest.isCommand()) {
+      customCommandExecutionHelper.addAction(actionRequest, stage, configuration, hostsMap, params);
     } else {
-      throw new AmbariException("Unsupported action " + actionRequest.getCommandName());
+      actionExecutionHelper.addAction(actionExecContext, stage, configuration, hostsMap, params);
     }
 
     RoleCommandOrder rco = this.getRoleCommandOrder(cluster);
@@ -2402,7 +2220,7 @@ public class AmbariManagementControllerImpl implements
     rg.build(stage);
     List<Stage> stages = rg.getStages();
     if (stages != null && !stages.isEmpty()) {
-      actionManager.sendActions(stages);
+      actionManager.sendActions(stages, actionRequest);
       return getRequestStatusResponse(stage.getRequestId());
     } else {
       throw new AmbariException("Stage was not created");
@@ -2428,18 +2246,6 @@ public class AmbariManagementControllerImpl implements
 
   }
 
-  private Boolean isValidCommand(String command, String service) {
-    List<String> actions = actionMetadata.getActions(service);
-    if (actions == null || actions.size() == 0) {
-      return false;
-    }
-
-    if (!actions.contains(command)) {
-      return false;
-    }
-
-    return true;
-  }
 
   private Set<StackResponse> getStacks(StackRequest request)
       throws AmbariException {

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
index fd1cd1f..f8dd908 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.ambari.server.controller;
 
+import org.apache.ambari.server.utils.StageUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -33,26 +37,29 @@ public class ExecuteActionRequest {
   private Map<String, String> parameters;
 
   public ExecuteActionRequest(String clusterName, String commandName,
-                       String actionName, String serviceName, String componentName,
-                       List<String> hosts, Map<String, String> parameters) {
-    this.clusterName = clusterName;
-    this.commandName = commandName;
+                              String actionName, String serviceName, String componentName,
+                              List<String> hosts, Map<String, String> parameters) {
+    this(clusterName, commandName, serviceName, parameters);
     this.actionName = actionName;
-    this.serviceName = serviceName;
     this.componentName = componentName;
-    this.parameters = parameters;
-    this.hosts = hosts;
+    if (hosts != null) {
+      this.hosts.addAll(hosts);
+    }
   }
 
   /**
    * Create an ExecuteActionRequest to execute a command
    */
   public ExecuteActionRequest(String clusterName, String commandName, String serviceName,
-                       Map<String, String> parameters) {
+                              Map<String, String> parameters) {
     this.clusterName = clusterName;
     this.commandName = commandName;
     this.serviceName = serviceName;
-    this.parameters = parameters;
+    this.parameters = new HashMap<String, String>();
+    if (parameters != null) {
+      this.parameters.putAll(parameters);
+    }
+    this.hosts = new ArrayList<String>();
   }
 
   public String getClusterName() {
@@ -86,4 +93,17 @@ public class ExecuteActionRequest {
   public Boolean isCommand() {
     return actionName == null || actionName.isEmpty();
   }
+
+  @Override
+  public synchronized String toString() {
+    return (new StringBuilder()).
+        append("isCommand :" + isCommand().toString()).
+        append(", action :" + actionName).
+        append(", command :" + commandName).
+        append(", inputs :" + parameters.toString()).
+        append(", targetService :" + serviceName).
+        append(", targetComponent :" + componentName).
+        append(", targetHosts :" + hosts.toString()).
+        append(", clusterName :" + clusterName).toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index f77c8a4..5678887 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -129,14 +129,14 @@ public class HostRoleCommandDAO {
   }
 
   @Transactional
-  public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, Role role) {
+  public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, String role) {
     TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT command " +
         "FROM HostRoleCommandEntity command " +
         "WHERE command.hostName=?1 AND command.requestId=?2 " +
         "AND command.stageId=?3 AND command.role=?4 " +
         "ORDER BY command.taskId", HostRoleCommandEntity.class);
 
-    return daoUtils.selectList(query, hostName, requestId, stageId, role.name());
+    return daoUtils.selectList(query, hostName, requestId, stageId, role);
   }
 
   @Transactional

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index 8271151..79c001a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -77,10 +77,6 @@ public class StageDAO {
 
   @Transactional
   public List<StageEntity> findByCommandStatuses(Collection<HostRoleStatus> statuses) {
-//    TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage " +
-//        "FROM StageEntity stage JOIN stage.hostRoleCommands command " +
-//        "WHERE command.status IN ?1 " +
-//        "ORDER BY stage.requestId, stage.stageId", StageEntity.class);
     TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage " +
           "FROM StageEntity stage WHERE stage.stageId IN (SELECT hrce.stageId FROM " +
           "HostRoleCommandEntity hrce WHERE stage.requestId = hrce.requestId and hrce.status IN ?1 ) " +

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
index 9e23516..7f1d031 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
@@ -21,7 +21,15 @@ package org.apache.ambari.server.orm.entities;
 import org.apache.ambari.server.actionmanager.ActionType;
 import org.apache.ambari.server.actionmanager.TargetHostType;
 
-import javax.persistence.*;
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
 
 @NamedQueries({
     @NamedQuery(name = "allActions", query =
@@ -52,11 +60,11 @@ public class ActionEntity {
   @Basic
   private String targetComponent;
 
-  @Column(name = "description")
+  @Column(name = "description", nullable = false)
   @Basic
   private String description = "";
 
-  @Column(name = "target_type")
+  @Column(name = "target_type", nullable = false)
   @Enumerated(EnumType.STRING)
   private TargetHostType targetType = TargetHostType.ANY;
 

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index b74a685..b922293 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -363,9 +363,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
          State.INSTALLED,
          ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
          new ServiceComponentHostOpCompletedTransition())
-    
 
-    
      .addTransition(State.INSTALLING,
          State.INSTALLING,
          ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,