You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2013/11/14 07:30:53 UTC
[2/2] git commit: AMBARI 3731. Custom Action: Add support for custom
action execution
AMBARI 3731. Custom Action: Add support for custom action execution
Project: http://git-wip-us.apache.org/repos/asf/incubator-ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ambari/commit/22f5fdfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ambari/tree/22f5fdfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ambari/diff/22f5fdfb
Branch: refs/heads/trunk
Commit: 22f5fdfb70916fb5ffe486c6bcb50f36bc0de1b4
Parents: 708e59d
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Wed Nov 13 22:30:26 2013 -0800
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Wed Nov 13 22:30:26 2013 -0800
----------------------------------------------------------------------
.../org/apache/ambari/server/RoleCommand.java | 3 +-
.../server/actionmanager/ActionDBAccessor.java | 73 +++-
.../actionmanager/ActionDBAccessorImpl.java | 61 ++-
.../actionmanager/ActionDBInMemoryImpl.java | 49 +--
.../server/actionmanager/ActionManager.java | 9 +-
.../server/actionmanager/ActionScheduler.java | 6 +-
.../CustomActionDBAccessorImpl.java | 12 +-
.../server/actionmanager/HostRoleCommand.java | 9 -
.../ambari/server/agent/ExecutionCommand.java | 13 +-
.../ambari/server/agent/HeartBeatHandler.java | 13 +-
.../controller/ActionExecutionContext.java | 91 +++++
.../controller/AmbariActionExecutionHelper.java | 290 ++++++++++++++
.../AmbariCustomCommandExecutionHelper.java | 239 ++++++++++++
.../AmbariManagementControllerImpl.java | 278 +++-----------
.../server/controller/ExecuteActionRequest.java | 38 +-
.../server/orm/dao/HostRoleCommandDAO.java | 4 +-
.../apache/ambari/server/orm/dao/StageDAO.java | 4 -
.../server/orm/entities/ActionEntity.java | 14 +-
.../svccomphost/ServiceComponentHostImpl.java | 2 -
.../apache/ambari/server/utils/StageUtils.java | 5 +-
.../ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql | 4 +
.../actionmanager/TestActionDBAccessorImpl.java | 76 +++-
.../server/actionmanager/TestActionManager.java | 2 +-
.../AmbariManagementControllerTest.java | 376 ++++++++++++++-----
.../apache/ambari/server/orm/TestOrmImpl.java | 6 +-
.../ambari/server/utils/TestStageUtils.java | 3 +-
26 files changed, 1214 insertions(+), 466 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
index 33370bf..ad006ec 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/RoleCommand.java
@@ -24,5 +24,6 @@ public enum RoleCommand {
STOP,
EXECUTE,
ABORT,
- UPGRADE
+ UPGRADE,
+ ACTIONEXECUTE
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 2c79edf..11605bb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -17,23 +17,35 @@
*/
package org.apache.ambari.server.actionmanager;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
+
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.ambari.server.Role;
-import org.apache.ambari.server.agent.CommandReport;
-
public interface ActionDBAccessor {
- public Stage getAction(String actionId);
+ /**
+ * Given an action id of the form requestId-stageId, retrieve the Stage
+ */
+ public Stage getStage(String actionId);
+ /**
+ * Get all stages associated with a single request id
+ */
public List<Stage> getAllStages(long requestId);
+ /**
+ * Abort all outstanding operations associated with the given request
+ */
public void abortOperation(long requestId);
- public void timeoutHostRole(String host, long requestId, long stageId, Role role);
+ /**
+ * Mark the task as to have timed out
+ */
+ public void timeoutHostRole(String host, long requestId, long stageId, String role);
/**
* Returns all the pending stages, including queued and not-queued.
@@ -41,47 +53,86 @@ public interface ActionDBAccessor {
*/
public List<Stage> getStagesInProgress();
+ /**
+ * Persists all tasks for a given request
+ *
+ * @param stages Stages belonging to the request
+ */
public void persistActions(List<Stage> stages);
+ /**
+ * For the given host, update all the tasks based on the command report
+ */
public void updateHostRoleState(String hostname, long requestId,
- long stageId, String role, CommandReport report);
+ long stageId, String role, CommandReport report);
- public void abortHostRole(String host, long requestId, long stageId,
- Role role);
+ /**
+ * Mark the task as to have been aborted
+ */
+ public void abortHostRole(String host, long requestId, long stageId, String role);
/**
* Return the last persisted Request ID as seen when the DBAccessor object
* was initialized.
* Value should remain unchanged through the lifetime of the object instance.
+ *
* @return Request Id seen at init time
*/
public long getLastPersistedRequestIdWhenInitialized();
/**
* Updates scheduled stage.
- * @param s
- * @param hostname
- * @param roleStr
*/
public void hostRoleScheduled(Stage s, String hostname, String roleStr);
+ /**
+ * Given a request id, get all the tasks that belong to this request
+ */
public List<HostRoleCommand> getRequestTasks(long requestId);
+ /**
+ * Given a list of request ids, get all the tasks that belong to these requests
+ */
public List<HostRoleCommand> getAllTasksByRequestIds(Collection<Long> requestIds);
+ /**
+ * Get a list of host role commands where the request id belongs to the input requestIds and
+ * the task id belongs to the input taskIds
+ */
public List<HostRoleCommand> getTasksByRequestAndTaskIds(Collection<Long> requestIds, Collection<Long> taskIds);
+ /**
+ * Given a list of task ids, get all the host role commands
+ */
public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds);
+ /**
+ * Get all stages that contain tasks with specified host role statuses
+ */
public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses);
+ /**
+ * Get all requests
+ */
public List<Long> getRequests();
+ /**
+ * Gets the host role command corresponding to the task id
+ */
public HostRoleCommand getTask(long taskId);
+ /**
+ * Gets request id of request that are in the specified status
+ */
public List<Long> getRequestsByStatus(RequestStatus status);
+ /**
+ * Gets request contexts associated with the list of request id
+ */
public Map<Long, String> getRequestContext(List<Long> requestIds);
+ /**
+ * Gets the request context associated with the request id
+ */
public String getRequestContext(long requestId);
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 75ebfef..d0cba5e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -17,16 +17,17 @@
*/
package org.apache.ambari.server.actionmanager;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
import com.google.inject.name.Named;
+import com.google.inject.persist.Transactional;
import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.Role;
import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
+import org.apache.ambari.server.orm.dao.ActionDefinitionDAO;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
import org.apache.ambari.server.orm.dao.HostDAO;
@@ -44,15 +45,20 @@ import org.apache.ambari.server.state.Clusters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-import com.google.inject.persist.Transactional;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
@Singleton
public class ActionDBAccessorImpl implements ActionDBAccessor {
private static final Logger LOG = LoggerFactory.getLogger(ActionDBAccessorImpl.class);
-
+ private final long requestId;
@Inject
private ClusterDAO clusterDAO;
@Inject
@@ -71,12 +77,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
private HostRoleCommandFactory hostRoleCommandFactory;
@Inject
private Clusters clusters;
-
private Cache<Long, HostRoleCommand> hostRoleCommandCache;
private long cacheLimit; //may be exceeded to store tasks from one request
- private final long requestId;
-
@Inject
public ActionDBAccessorImpl(Injector injector, @Named("executionCommandCacheSize") long cacheLimit) {
injector.injectMembers(this);
@@ -90,10 +93,10 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
}
/* (non-Javadoc)
- * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getAction(java.lang.String)
+ * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getStage(java.lang.String)
*/
@Override
- public Stage getAction(String actionId) {
+ public Stage getStage(String actionId) {
return stageFactory.createExisting(actionId);
}
@@ -137,7 +140,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
@Override
@Transactional
public void timeoutHostRole(String host, long requestId, long stageId,
- Role role) {
+ String role) {
List<HostRoleCommandEntity> commands =
hostRoleCommandDAO.findByHostRole(host, requestId, stageId, role);
for (HostRoleCommandEntity command : commands) {
@@ -170,6 +173,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding stages to DB, stageCount=" + stages.size());
}
+
for (Stage stage : stages) {
StageEntity stageEntity = stage.constructNewPersistenceEntity();
Cluster cluster;
@@ -213,7 +217,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
roleSuccessCriteriaDAO.create(roleSuccessCriteriaEntity);
}
-
}
}
@@ -227,7 +230,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
+ stageId + " role " + role + " report " + report);
}
List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(
- hostname, requestId, stageId, Role.valueOf(role));
+ hostname, requestId, stageId, role);
for (HostRoleCommandEntity command : commands) {
command.setStatus(HostRoleStatus.valueOf(report.getStatus()));
command.setStdOut(report.getStdOut().getBytes());
@@ -238,13 +241,13 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
}
@Override
- public void abortHostRole(String host, long requestId, long stageId, Role role) {
+ public void abortHostRole(String host, long requestId, long stageId, String role) {
CommandReport report = new CommandReport();
report.setExitCode(999);
report.setStdErr("Host Role in invalid state");
report.setStdOut("");
report.setStatus("ABORTED");
- updateHostRoleState(host, requestId, stageId, role.toString(), report);
+ updateHostRoleState(host, requestId, stageId, role, report);
}
@Override
@@ -266,7 +269,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
} else {
throw new RuntimeException("HostRoleCommand is not persisted, cannot update:\n" + hostRoleCommand);
}
-
}
@Override
@@ -275,11 +277,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
return getTasks(
hostRoleCommandDAO.findTaskIdsByRequest(requestId)
);
-
-// for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequest(requestId)) {
-// tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-// }
-// return tasks;
}
@Override
@@ -291,12 +288,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
return getTasks(
hostRoleCommandDAO.findTaskIdsByRequestIds(requestIds)
);
-
-// List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
-// for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestIds(requestIds)) {
-// tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-// }
-// return tasks;
}
@Override
@@ -304,11 +295,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
if (!requestIds.isEmpty() && !taskIds.isEmpty()) {
return getTasks(hostRoleCommandDAO.findTaskIdsByRequestAndTaskIds(requestIds, taskIds));
-// List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
-// for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestAndTaskIds(requestIds, taskIds)) {
-// tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
-// }
-// return tasks;
} else if (requestIds.isEmpty()) {
return getTasks(taskIds);
} else if (taskIds.isEmpty()) {
@@ -335,7 +321,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
if (!absent.isEmpty()) {
boolean allowStore = hostRoleCommandCache.size() <= cacheLimit;
-// LOG.info("Cache size {}, enable = {}", hostRoleCommandCache.size(), allowStore);
for (HostRoleCommandEntity commandEntity : hostRoleCommandDAO.findByPKs(absent)) {
HostRoleCommand hostRoleCommand = hostRoleCommandFactory.createExisting(commandEntity);
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
index 1fccf12..8c36366 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
@@ -17,38 +17,42 @@
*/
package org.apache.ambari.server.actionmanager;
-import java.util.*;
-
-import org.apache.ambari.server.Role;
+import com.google.inject.Singleton;
import org.apache.ambari.server.agent.CommandReport;
-import org.apache.ambari.server.agent.ExecutionCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.inject.Singleton;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
@Singleton
public class ActionDBInMemoryImpl implements ActionDBAccessor {
+ private static Logger LOG = LoggerFactory.getLogger(ActionDBInMemoryImpl.class);
// for a persisted DB, this will be initialized in the ctor
// with the highest persisted requestId value in the DB
private final long lastRequestId = 0;
- private static Logger LOG = LoggerFactory.getLogger(ActionDBInMemoryImpl.class);
List<Stage> stageList = new ArrayList<Stage>();
@Override
- public synchronized Stage getAction(String actionId) {
- for (Stage s: stageList) {
+ public synchronized Stage getStage(String actionId) {
+ for (Stage s : stageList) {
if (s.getActionId().equals(actionId)) {
return s;
}
}
return null;
}
+
@Override
public synchronized List<Stage> getAllStages(long requestId) {
List<Stage> l = new ArrayList<Stage>();
- for (Stage s: stageList) {
+ for (Stage s : stageList) {
if (s.getRequestId() == requestId) {
l.add(s);
}
@@ -80,7 +84,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
@Override
public synchronized void timeoutHostRole(String host, long requestId,
- long stageId, Role role) {
+ long stageId, String role) {
for (Stage s : stageList) {
s.setHostRoleStatus(host, role.toString(), HostRoleStatus.TIMEDOUT);
}
@@ -89,7 +93,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
@Override
public synchronized List<Stage> getStagesInProgress() {
List<Stage> l = new ArrayList<Stage>();
- for (Stage s: stageList) {
+ for (Stage s : stageList) {
if (s.isStageInProgress()) {
l.add(s);
}
@@ -99,15 +103,16 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
@Override
public synchronized void persistActions(List<Stage> stages) {
- for (Stage s: stages) {
+ for (Stage s : stages) {
stageList.add(s);
}
}
+
@Override
public synchronized void updateHostRoleState(String hostname, long requestId,
- long stageId, String role, CommandReport report) {
- LOG.info("DEBUG stages to iterate: "+stageList.size());
- if(null == report.getStatus()
+ long stageId, String role, CommandReport report) {
+ LOG.info("DEBUG stages to iterate: " + stageList.size());
+ if (null == report.getStatus()
|| null == report.getStdOut()
|| null == report.getStdErr()) {
throw new RuntimeException("Badly formed command report.");
@@ -124,13 +129,13 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
}
@Override
- public void abortHostRole(String host, long requestId, long stageId, Role role) {
+ public void abortHostRole(String host, long requestId, long stageId, String role) {
CommandReport report = new CommandReport();
report.setExitCode(999);
report.setStdErr("Host Role in invalid state");
report.setStdOut("");
report.setStatus("ABORTED");
- updateHostRoleState(host, requestId, stageId, role.toString(), report);
+ updateHostRoleState(host, requestId, stageId, role, report);
}
@Override
@@ -168,17 +173,18 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
@Override
public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) {
List<Stage> l = new ArrayList<Stage>();
- for (Stage s: stageList) {
+ for (Stage s : stageList) {
if (s.doesStageHaveHostRoleStatus(statuses)) {
l.add(s);
}
}
return l;
}
+
@Override
public synchronized List<Long> getRequests() {
Set<Long> requestIds = new HashSet<Long>();
- for (Stage s: stageList) {
+ for (Stage s : stageList) {
requestIds.add(s.getRequestId());
}
List<Long> ids = new ArrayList<Long>();
@@ -199,6 +205,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
}
return null;
}
+
@Override
public List<Long> getRequestsByStatus(RequestStatus status) {
// TODO
@@ -211,7 +218,7 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
for (Long requestId : requestIds) {
List<Stage> stages = getAllStages(requestId);
result.put(requestId, stages != null && !stages.isEmpty() ? stages.get
- (0).getRequestContext() : "");
+ (0).getRequestContext() : "");
}
return result;
}
@@ -220,6 +227,6 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
public String getRequestContext(long requestId) {
List<Stage> stages = getAllStages(requestId);
return stages != null && !stages.isEmpty() ? stages.get(0)
- .getRequestContext() : "";
+ .getRequestContext() : "";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index 6b32e73..aa553c4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -24,6 +24,7 @@ import com.google.inject.persist.UnitOfWork;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.serveraction.ServerActionManager;
import org.apache.ambari.server.state.Clusters;
@@ -73,12 +74,16 @@ public class ActionManager {
scheduler.stop();
}
- public void sendActions(List<Stage> stages) {
+ public void sendActions(List<Stage> stages, ExecuteActionRequest request) {
if (LOG.isDebugEnabled()) {
for (Stage s : stages) {
LOG.debug("Persisting stage into db: " + s.toString());
}
+
+ if (request != null) {
+ LOG.debug("In response to request: " + request.toString());
+ }
}
db.persistActions(stages);
@@ -91,7 +96,7 @@ public class ActionManager {
}
public Stage getAction(long requestId, long stageId) {
- return db.getAction(StageUtils.getActionId(requestId, stageId));
+ return db.getStage(StageUtils.getActionId(requestId, stageId));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 22ed44e..a5c5dc0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -205,8 +205,7 @@ class ActionScheduler implements Runnable {
scheduleHostRole(s, cmd);
} catch (InvalidStateTransitionException e) {
LOG.warn("Could not schedule host role " + cmd.toString(), e);
- db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(),
- Role.valueOf(cmd.getRole()));
+ db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(), cmd.getRole());
}
}
}
@@ -326,8 +325,7 @@ class ActionScheduler implements Runnable {
if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
LOG.warn("Host:" + host + ", role:" + roleStr + ", actionId:"
+ s.getActionId() + " expired");
- db.timeoutHostRole(host, s.getRequestId(), s.getStageId(),
- Role.valueOf(c.getRole()));
+ db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), c.getRole());
//Reinitialize status
status = s.getHostRoleStatus(host, roleStr);
ServiceComponentHostOpFailedEvent timeoutEvent =
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
index eb0cfa9..38bc371 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/CustomActionDBAccessorImpl.java
@@ -98,7 +98,8 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
TargetHostType targetType, String serviceType, String componentType,
Short defaultTimeout)
throws AmbariException {
- validateCreateInput(actionName, actionType, inputs, description, defaultTimeout);
+ validateCreateInput(actionName, actionType, inputs, description, defaultTimeout,
+ targetType, serviceType, componentType);
ActionEntity entity =
actionDefinitionDAO.findByPK(actionName);
if (entity == null) {
@@ -178,7 +179,8 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
}
private void validateCreateInput(String actionName, ActionType actionType, String inputs,
- String description, Short defaultTimeout)
+ String description, Short defaultTimeout,
+ TargetHostType targetType, String serviceType, String componentType)
throws AmbariException {
validateActionName(actionName);
@@ -199,6 +201,12 @@ public class CustomActionDBAccessorImpl implements CustomActionDBAccessor {
throw new AmbariException("Action type cannot be " + actionType);
}
+ if (serviceType == null || serviceType.isEmpty()) {
+ if (componentType != null && !componentType.isEmpty()) {
+ throw new AmbariException("Target component cannot be specified unless target service is specified");
+ }
+ }
+
if (inputs != null && !inputs.isEmpty()) {
String[] parameters = inputs.split(",");
for (String parameter : parameters) {
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
index e3bed0c..21ec077 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
@@ -86,10 +86,6 @@ public class HostRoleCommand {
//make use of lazy loading
executionCommandDAO = injector.getInstance(ExecutionCommandDAO.class);
-// executionCommandWrapper = new ExecutionCommandWrapper(new String(
-// hostRoleCommandEntity
-// .getExecutionCommand().getCommand()
-// ));
}
HostRoleCommandEntity constructNewPersistenceEntity() {
@@ -106,10 +102,6 @@ public class HostRoleCommand {
hostRoleCommandEntity.setRoleCommand(roleCommand);
hostRoleCommandEntity.setEvent(event.getEventJson());
-// ExecutionCommandEntity executionCommandEntity = new ExecutionCommandEntity();
-// executionCommandEntity.setCommand(executionCommandWrapper.getJson().getBytes());
-// executionCommandEntity.setHostRoleCommand(hostRoleCommandEntity);
-// hostRoleCommandEntity.setExecutionCommand(executionCommandEntity);
return hostRoleCommandEntity;
}
@@ -120,7 +112,6 @@ public class HostRoleCommand {
return executionCommandEntity;
}
-
public long getTaskId() {
return taskId;
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
index 1b4ecdf..c72c14b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
@@ -54,7 +54,8 @@ public class ExecutionCommand extends AgentCommand {
private Map<String, Map<String, String>> configurationTags;
private Map<String, String> commandParams;
private String serviceName;
-
+ private String componentName;
+
@JsonProperty("commandId")
public String getCommandId() {
return this.commandId;
@@ -202,6 +203,16 @@ public class ExecutionCommand extends AgentCommand {
this.serviceName = serviceName;
}
+ @JsonProperty("componentName")
+ public String getComponentName() {
+ return componentName;
+ }
+
+ @JsonProperty("componentName")
+ public void setComponentName(String componentName) {
+ this.componentName = componentName;
+ }
+
/**
* @param configTags the config tag map
*/
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 60aede9..9da4d45 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -187,16 +187,19 @@ public class HeartBeatHandler {
return response;
}
- protected void processCommandReports(HeartBeat heartbeat,
- String hostname,
- Clusters clusterFsm, long now)
+ protected void processCommandReports(
+ HeartBeat heartbeat, String hostname, Clusters clusterFsm, long now)
throws AmbariException {
List<CommandReport> reports = heartbeat.getReports();
for (CommandReport report : reports) {
LOG.debug("Received command report: " + report);
+ if (RoleCommand.ACTIONEXECUTE.equals(report.getRoleCommand())) {
+ continue;
+ }
+
Cluster cl = clusterFsm.getCluster(report.getClusterName());
String service = report.getServiceName();
- if (service == null || "".equals(service)) {
+ if (service == null || service.isEmpty()) {
throw new AmbariException("Invalid command report, service: " + service);
}
if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
@@ -217,7 +220,7 @@ public class HeartBeatHandler {
&& null != report.getConfigurationTags()
&& !report.getConfigurationTags().isEmpty()) {
LOG.info("Updating applied config on service " + scHost.getServiceName() +
- ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
+ ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
scHost.updateActualConfigs(report.getConfigurationTags());
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
new file mode 100644
index 0000000..f1bea70
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ActionExecutionContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.actionmanager.TargetHostType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The context required to create tasks and stages for a custom action
+ */
+public class ActionExecutionContext {
+ private final String clusterName;
+ private final String actionName;
+ private final String serviceName;
+ private final String componentName;
+ private final List<String> hosts;
+ private final Map<String, String> parameters;
+ private final TargetHostType targetType;
+ private final Short timeout;
+
+ /**
+ * Create an ActionExecutionContext to execute an action from a request
+ */
+ public ActionExecutionContext(String clusterName, String actionName, String serviceName,
+ String componentName, List<String> hosts, Map<String, String> parameters,
+ TargetHostType targetType, Short timeout) {
+ this.clusterName = clusterName;
+ this.actionName = actionName;
+ this.serviceName = serviceName;
+ this.componentName = componentName;
+ this.parameters = parameters;
+ this.hosts = new ArrayList<String>();
+ if (hosts != null) {
+ this.hosts.addAll(hosts);
+ }
+ this.targetType = targetType;
+ this.timeout = timeout;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getActionName() {
+ return actionName;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public String getComponentName() {
+ return componentName;
+ }
+
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+
+ public List<String> getHosts() {
+ return hosts;
+ }
+
+ public TargetHostType getTargetType() {
+ return targetType;
+ }
+
+ public Short getTimeout() {
+ return timeout;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
new file mode 100644
index 0000000..632b11d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.StackAccessException;
+import org.apache.ambari.server.actionmanager.ActionDefinition;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.TargetHostType;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.ComponentInfo;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceInfo;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Helper class containing logic to process custom action execution requests
+ */
+public class AmbariActionExecutionHelper {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(AmbariCustomCommandExecutionHelper.class);
+ private ActionMetadata actionMetadata;
+ private Clusters clusters;
+ private AmbariManagementControllerImpl amcImpl;
+ private ActionManager actionManager;
+ private AmbariMetaInfo ambariMetaInfo;
+
+ public AmbariActionExecutionHelper(ActionMetadata actionMetadata, Clusters clusters,
+ AmbariManagementControllerImpl amcImpl) {
+ this.amcImpl = amcImpl;
+ this.actionMetadata = actionMetadata;
+ this.clusters = clusters;
+ this.actionManager = amcImpl.getActionManager();
+ this.ambariMetaInfo = amcImpl.getAmbariMetaInfo();
+ }
+
+ /**
+ * Validates the request to execute an action
+ *
+ * @param actionRequest
+ * @param cluster
+ * @return
+ * @throws AmbariException
+ */
+ public ActionExecutionContext validateCustomAction(ExecuteActionRequest actionRequest, Cluster cluster)
+ throws AmbariException {
+ if (actionRequest.getActionName() == null || actionRequest.getActionName().isEmpty()) {
+ throw new AmbariException("Action name must be specified");
+ }
+
+ ActionDefinition actionDef = actionManager.getActionDefinition(actionRequest.getActionName());
+ if (actionDef == null) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " does not exist");
+ }
+
+ StackId stackId = cluster.getCurrentStackVersion();
+ String expectedService = actionDef.getTargetService() == null ? "" : actionDef.getTargetService();
+ String actualService = actionRequest.getServiceName() == null ? "" : actionRequest.getServiceName();
+ if (!expectedService.isEmpty() && !actualService.isEmpty() && !expectedService.equals(actualService)) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " targets service " + actualService +
+ " that does not match with expected " + expectedService);
+ }
+
+ String targetService = expectedService;
+ if (targetService == null || targetService.isEmpty()) {
+ targetService = actualService;
+ }
+
+ if (targetService != null && !targetService.isEmpty()) {
+ ServiceInfo serviceInfo;
+ try {
+ serviceInfo = ambariMetaInfo.getService(stackId.getStackName(), stackId.getStackVersion(),
+ targetService);
+ } catch (StackAccessException se) {
+ serviceInfo = null;
+ }
+
+ if (serviceInfo == null) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " targets service " + targetService +
+ " that does not exist.");
+ }
+ }
+
+ String expectedComponent = actionDef.getTargetComponent() == null ? "" : actionDef.getTargetComponent();
+ String actualComponent = actionRequest.getComponentName() == null ? "" : actionRequest.getComponentName();
+ if (!expectedComponent.isEmpty() && !actualComponent.isEmpty() && !expectedComponent.equals(actualComponent)) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + actualComponent +
+ " that does not match with expected " + expectedComponent);
+ }
+
+ String targetComponent = expectedComponent;
+ if (targetComponent == null || targetComponent.isEmpty()) {
+ targetComponent = actualComponent;
+ }
+
+ if (!targetComponent.isEmpty() && targetService.isEmpty()) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + targetComponent +
+ " without specifying the target service.");
+ }
+
+ if (targetComponent != null && !targetComponent.isEmpty()) {
+ ComponentInfo compInfo;
+ try {
+ compInfo = ambariMetaInfo.getComponent(stackId.getStackName(), stackId.getStackVersion(),
+ targetService, targetComponent);
+ } catch (StackAccessException se) {
+ compInfo = null;
+ }
+
+ if (compInfo == null) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " targets component " + targetComponent +
+ " that does not exist.");
+ }
+ }
+
+ if (actionDef.getInputs() != null) {
+ String[] inputs = actionDef.getInputs().split(",");
+ for (String input : inputs) {
+ if (!input.trim().isEmpty() && !actionRequest.getParameters().containsKey(input.trim())) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " requires input '" +
+ input.trim() + "' that is not provided.");
+ }
+ }
+ }
+
+ if (actionDef.getTargetType() == TargetHostType.SPECIFIC
+ || (targetService.isEmpty() && targetService.isEmpty())) {
+ if (actionRequest.getHosts().size() == 0) {
+ throw new AmbariException("Action " + actionRequest.getActionName() + " requires explicit target host(s)" +
+ " that is not provided.");
+ }
+ }
+
+ LOG.info("Received action execution request"
+ + ", clusterName=" + actionRequest.getClusterName()
+ + ", request=" + actionRequest.toString());
+
+ ActionExecutionContext actionExecutionContext = new ActionExecutionContext(
+ actionRequest.getClusterName(), actionRequest.getActionName(), targetService, targetComponent,
+ actionRequest.getHosts(), actionRequest.getParameters(), actionDef.getTargetType(),
+ actionDef.getDefaultTimeout());
+
+ return actionExecutionContext;
+ }
+
+ /**
+ * Add tasks to the stage based on the requested action execution
+ *
+ * @param actionContext the context associated with the action
+ * @param stage stage into which tasks must be inserted
+ * @param configuration
+ * @param hostsMap
+ * @param hostLevelParams
+ * @throws AmbariException
+ */
+ public void addAction(ActionExecutionContext actionContext, Stage stage,
+ Configuration configuration, HostsMap hostsMap, Map<String, String> hostLevelParams)
+ throws AmbariException {
+ String actionName = actionContext.getActionName();
+ String clusterName = actionContext.getClusterName();
+ String serviceName = actionContext.getServiceName();
+ String componentName = actionContext.getComponentName();
+
+ // List of host to select from
+ Set<String> candidateHosts = new HashSet<String>();
+ if (!serviceName.isEmpty()) {
+ if (!componentName.isEmpty()) {
+ Map<String, ServiceComponentHost> componentHosts =
+ clusters.getCluster(clusterName).getService(serviceName)
+ .getServiceComponent(componentName).getServiceComponentHosts();
+ candidateHosts.addAll(componentHosts.keySet());
+ } else {
+ for (String component : clusters.getCluster(clusterName).getService(serviceName)
+ .getServiceComponents().keySet()) {
+ Map<String, ServiceComponentHost> componentHosts =
+ clusters.getCluster(clusterName).getService(serviceName)
+ .getServiceComponent(component).getServiceComponentHosts();
+ candidateHosts.addAll(componentHosts.keySet());
+ }
+ }
+ } else {
+ // All hosts are valid target host
+ candidateHosts.addAll(amcImpl.getClusters().getHostsForCluster(clusterName).keySet());
+ }
+
+ // If request did not specify hosts and there exists no host
+ if (actionContext.getHosts().isEmpty() && candidateHosts.isEmpty()) {
+ throw new AmbariException("Suitable hosts not found, component="
+ + componentName + ", service=" + serviceName
+ + ", cluster=" + clusterName + ", actionName=" + actionName);
+ }
+
+ // Compare specified hosts to available hosts
+ if (!actionContext.getHosts().isEmpty() && !candidateHosts.isEmpty()) {
+ for (String hostname : actionContext.getHosts()) {
+ if (!candidateHosts.contains(hostname)) {
+ throw new AmbariException("Request specifies host " + hostname + " but its not a valid host based on the " +
+ "target service=" + serviceName + " and component=" + componentName);
+ }
+ }
+ }
+
+ //Find target hosts to execute
+ if (actionContext.getHosts().isEmpty()) {
+ TargetHostType hostType = actionContext.getTargetType();
+ switch (hostType) {
+ case ALL:
+ actionContext.getHosts().addAll(candidateHosts);
+ break;
+ case ANY:
+ actionContext.getHosts().add(amcImpl.getHealthyHost(candidateHosts));
+ break;
+ case MAJORITY:
+ for (int i = 0; i < (candidateHosts.size() / 2) + 1; i++) {
+ String hostname = amcImpl.getHealthyHost(candidateHosts);
+ actionContext.getHosts().add(hostname);
+ candidateHosts.remove(hostname);
+ }
+ break;
+ default:
+ throw new AmbariException("Unsupported target type=" + hostType);
+ }
+ }
+
+ //create tasks for each host
+ for (String hostName : actionContext.getHosts()) {
+ stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionContext.getActionName()), RoleCommand.ACTIONEXECUTE,
+ new ServiceComponentHostOpInProgressEvent(actionContext.getActionName(), hostName,
+ System.currentTimeMillis()), clusterName, actionContext.getServiceName());
+
+ stage.getExecutionCommandWrapper(hostName, actionContext.getActionName()).getExecutionCommand()
+ .setRoleParams(actionContext.getParameters());
+
+ Cluster cluster = clusters.getCluster(clusterName);
+
+ Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
+ Map<String, Map<String, String>> configTags = null;
+ if (!actionContext.getServiceName().isEmpty()) {
+ configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, hostName);
+ }
+
+ ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
+ actionContext.getActionName()).getExecutionCommand();
+
+ execCmd.setConfigurations(configurations);
+ execCmd.setConfigurationTags(configTags);
+ execCmd.setHostLevelParams(hostLevelParams);
+ execCmd.setCommandParams(actionContext.getParameters());
+ execCmd.setServiceName(serviceName);
+ execCmd.setComponentName(componentName);
+
+ // Generate cluster host info
+ execCmd.setClusterHostInfo(
+ StageUtils.getClusterHostInfo(clusters.getHostsForCluster(clusterName), cluster, hostsMap, configuration));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
new file mode 100644
index 0000000..fa7522b
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Helper class containing logic to process custom command execution requests
+ */
+public class AmbariCustomCommandExecutionHelper {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(AmbariCustomCommandExecutionHelper.class);
+ private ActionMetadata actionMetadata;
+ private Clusters clusters;
+ private AmbariManagementControllerImpl amcImpl;
+
+ public AmbariCustomCommandExecutionHelper(ActionMetadata actionMetadata, Clusters clusters,
+ AmbariManagementControllerImpl amcImpl) {
+ this.amcImpl = amcImpl;
+ this.actionMetadata = actionMetadata;
+ this.clusters = clusters;
+ }
+
+ public void validateCustomCommand(ExecuteActionRequest actionRequest) throws AmbariException {
+ if (actionRequest.getServiceName() == null
+ || actionRequest.getServiceName().isEmpty()
+ || actionRequest.getCommandName() == null
+ || actionRequest.getCommandName().isEmpty()) {
+ throw new AmbariException("Invalid request : " + "cluster="
+ + actionRequest.getClusterName() + ", service="
+ + actionRequest.getServiceName() + ", command="
+ + actionRequest.getCommandName());
+ }
+
+ LOG.info("Received a command execution request"
+ + ", clusterName=" + actionRequest.getClusterName()
+ + ", serviceName=" + actionRequest.getServiceName()
+ + ", request=" + actionRequest.toString());
+
+ if (!isValidCommand(actionRequest.getCommandName(), actionRequest.getServiceName())) {
+ throw new AmbariException(
+ "Unsupported action " + actionRequest.getCommandName() + " for " + actionRequest.getServiceName());
+ }
+ }
+
+ private Boolean isValidCommand(String command, String service) {
+ List<String> actions = actionMetadata.getActions(service);
+ if (actions == null || actions.size() == 0) {
+ return false;
+ }
+
+ if (!actions.contains(command)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public void addAction(ExecuteActionRequest actionRequest, Stage stage,
+ Configuration configuration, HostsMap hostsMap, Map<String, String> hostLevelParams)
+ throws AmbariException {
+ if (actionRequest.getCommandName().contains("SERVICE_CHECK")) {
+ addServiceCheckAction(actionRequest, stage, configuration, hostsMap, hostLevelParams);
+ } else if (actionRequest.getCommandName().equals("DECOMMISSION_DATANODE")) {
+ addDecommissionDatanodeAction(actionRequest, stage, hostLevelParams);
+ } else {
+ throw new AmbariException("Unsupported action " + actionRequest.getCommandName());
+ }
+ }
+
+ private void addServiceCheckAction(ExecuteActionRequest actionRequest, Stage stage,
+ Configuration configuration, HostsMap hostsMap,
+ Map<String, String> hostLevelParams)
+ throws AmbariException {
+ String clusterName = actionRequest.getClusterName();
+ String componentName = actionMetadata.getClient(actionRequest
+ .getServiceName());
+
+ String hostName;
+ if (componentName != null) {
+ Map<String, ServiceComponentHost> components = clusters
+ .getCluster(clusterName).getService(actionRequest.getServiceName())
+ .getServiceComponent(componentName).getServiceComponentHosts();
+
+ if (components.isEmpty()) {
+ throw new AmbariException("Hosts not found, component="
+ + componentName + ", service=" + actionRequest.getServiceName()
+ + ", cluster=" + clusterName);
+ }
+ hostName = amcImpl.getHealthyHost(components.keySet());
+ } else {
+ Map<String, ServiceComponent> components = clusters
+ .getCluster(clusterName).getService(actionRequest.getServiceName())
+ .getServiceComponents();
+
+ if (components.isEmpty()) {
+ throw new AmbariException("Components not found, service="
+ + actionRequest.getServiceName() + ", cluster=" + clusterName);
+ }
+
+ ServiceComponent serviceComponent = components.values().iterator()
+ .next();
+
+ if (serviceComponent.getServiceComponentHosts().isEmpty()) {
+ throw new AmbariException("Hosts not found, component="
+ + serviceComponent.getName() + ", service="
+ + actionRequest.getServiceName() + ", cluster=" + clusterName);
+ }
+
+ hostName = serviceComponent.getServiceComponentHosts().keySet()
+ .iterator().next();
+ }
+
+ stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionRequest
+ .getCommandName()), RoleCommand.EXECUTE,
+ new ServiceComponentHostOpInProgressEvent(componentName, hostName,
+ System.currentTimeMillis()), clusterName, actionRequest
+ .getServiceName());
+
+ stage.getExecutionCommandWrapper(hostName, actionRequest.getCommandName()).getExecutionCommand()
+ .setRoleParams(actionRequest.getParameters());
+
+ Cluster cluster = clusters.getCluster(clusterName);
+
+ // [ type -> [ key, value ] ]
+ Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
+ Map<String, Map<String, String>> configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, hostName);
+
+ ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
+ actionRequest.getCommandName()).getExecutionCommand();
+
+ execCmd.setConfigurations(configurations);
+ execCmd.setConfigurationTags(configTags);
+ execCmd.setHostLevelParams(hostLevelParams);
+
+ // Generate cluster host info
+ execCmd.setClusterHostInfo(
+ StageUtils.getClusterHostInfo(clusters.getHostsForCluster(clusterName), cluster, hostsMap, configuration));
+ }
+
+ private void addDecommissionDatanodeAction(ExecuteActionRequest decommissionRequest, Stage stage,
+ Map<String, String> hostLevelParams)
+ throws AmbariException {
+ String hdfsExcludeFileType = "hdfs-exclude-file";
+ // Find hdfs admin host, just decommission from namenode.
+ String clusterName = decommissionRequest.getClusterName();
+ Cluster cluster = clusters.getCluster(clusterName);
+ String serviceName = decommissionRequest.getServiceName();
+ String namenodeHost = clusters.getCluster(clusterName)
+ .getService(serviceName).getServiceComponent(Role.NAMENODE.toString())
+ .getServiceComponentHosts().keySet().iterator().next();
+
+ String excludeFileTag = null;
+ if (decommissionRequest.getParameters() != null
+ && (decommissionRequest.getParameters().get("excludeFileTag") != null)) {
+ excludeFileTag = decommissionRequest.getParameters()
+ .get("excludeFileTag");
+ }
+
+ if (excludeFileTag == null) {
+ throw new AmbariException("No exclude file specified"
+ + " when decommissioning datanodes. Provide parameter excludeFileTag with the tag for config type "
+ + hdfsExcludeFileType);
+ }
+
+ Config config = clusters.getCluster(clusterName).getConfig(
+ hdfsExcludeFileType, excludeFileTag);
+ if (config == null) {
+ throw new AmbariException("Decommissioning datanodes requires the cluster to be associated with config type " +
+ hdfsExcludeFileType + " with a list of datanodes to be decommissioned (\"datanodes\" : list).");
+ }
+
+ LOG.info("Decommissioning data nodes: " + config.getProperties().get("datanodes") +
+ " " + hdfsExcludeFileType + " tag: " + excludeFileTag);
+
+ Map<String, Map<String, String>> configurations =
+ new TreeMap<String, Map<String, String>>();
+
+
+ Map<String, Map<String, String>> configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, namenodeHost);
+
+ // Add the tag for hdfs-exclude-file
+ Map<String, String> excludeTags = new HashMap<String, String>();
+ excludeTags.put(ConfigHelper.CLUSTER_DEFAULT_TAG, config.getVersionTag());
+ configTags.put(hdfsExcludeFileType, excludeTags);
+
+ stage.addHostRoleExecutionCommand(
+ namenodeHost,
+ Role.DECOMMISSION_DATANODE,
+ RoleCommand.EXECUTE,
+ new ServiceComponentHostOpInProgressEvent(Role.DECOMMISSION_DATANODE
+ .toString(), namenodeHost, System.currentTimeMillis()),
+ clusterName, serviceName);
+
+ ExecutionCommand execCmd = stage.getExecutionCommandWrapper(namenodeHost,
+ Role.DECOMMISSION_DATANODE.toString()).getExecutionCommand();
+
+ execCmd.setConfigurations(configurations);
+ execCmd.setConfigurationTags(configTags);
+ execCmd.setHostLevelParams(hostLevelParams);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index ef5c372..890cb81 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -165,7 +165,10 @@ public class AmbariManagementControllerImpl implements
final private String ojdbcUrl;
final private String serverDB;
final private String mysqljdbcUrl;
-
+
+ final private AmbariCustomCommandExecutionHelper customCommandExecutionHelper;
+ final private AmbariActionExecutionHelper actionExecutionHelper;
+
@Inject
public AmbariManagementControllerImpl(ActionManager actionManager,
Clusters clusters, Injector injector) throws Exception {
@@ -200,6 +203,11 @@ public class AmbariManagementControllerImpl implements
this.mysqljdbcUrl = null;
this.serverDB = null;
}
+
+ this.customCommandExecutionHelper = new AmbariCustomCommandExecutionHelper(
+ this.actionMetadata, this.clusters, this);
+ this.actionExecutionHelper = new AmbariActionExecutionHelper(
+ this.actionMetadata, this.clusters, this);
}
public String getAmbariServerURI(String path) {
@@ -1090,7 +1098,7 @@ public class AmbariManagementControllerImpl implements
* @return
* @throws AmbariException
*/
- private Map<String, Map<String,String>> findConfigurationTagsWithOverrides(
+ protected Map<String, Map<String,String>> findConfigurationTagsWithOverrides(
Cluster cluster, String hostName) throws AmbariException {
Map<String, Map<String,String>> configTags =
@@ -1099,7 +1107,6 @@ public class AmbariManagementControllerImpl implements
return configTags;
}
-
private List<Stage> doStageCreation(Cluster cluster,
Map<State, List<Service>> changedServices,
Map<State, List<ServiceComponent>> changedComps,
@@ -1142,7 +1149,7 @@ public class AmbariManagementControllerImpl implements
// multiple stages may be needed for reconfigure
long stageId = 0;
Map<String, List<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
- clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector);
+ clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector.getInstance(Configuration.class));
String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);
@@ -1353,7 +1360,8 @@ public class AmbariManagementControllerImpl implements
stage.getExecutionCommandWrapper(clientHost, smokeTestRole)
.getExecutionCommand()
.setClusterHostInfo(StageUtils.getClusterHostInfo(
- clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector));
+ clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap,
+ injector.getInstance(Configuration.class)));
Map<String,String> hostParams = new HashMap<String, String>();
hostParams.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
@@ -1381,7 +1389,7 @@ public class AmbariManagementControllerImpl implements
+ ", requestId=" + stages.get(0).getRequestId()
+ ", stagesCount=" + stages.size());
}
- actionManager.sendActions(stages);
+ actionManager.sendActions(stages, null);
}
}
@@ -1472,7 +1480,6 @@ public class AmbariManagementControllerImpl implements
Map<String, Map<String, Map<String, Set<String>>>> hostComponentNames =
new HashMap<String, Map<String, Map<String, Set<String>>>>();
Set<State> seenNewStates = new HashSet<State>();
- boolean processingUpgradeRequest = false;
int numberOfRequestsProcessed = 0;
StackId fromStackVersion = new StackId();
Map<ServiceComponentHost, State> directTransitionScHosts = new HashMap<ServiceComponentHost, State>();
@@ -1541,9 +1548,6 @@ public class AmbariManagementControllerImpl implements
}
}
- // If upgrade request comes without state information then its an error
- boolean upgradeRequest = checkIfUpgradeRequestAndValidate(request, cluster, s, sc, sch);
-
if (newState == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Nothing to do for new updateServiceComponentHost request"
@@ -1565,25 +1569,6 @@ public class AmbariManagementControllerImpl implements
seenNewStates.add(newState);
- if (!processingUpgradeRequest && upgradeRequest) {
- processingUpgradeRequest = true;
- // this needs to be the first request
- if (numberOfRequestsProcessed > 1) {
- throw new AmbariException("An upgrade request cannot be combined with " +
- "other non-upgrade requests.");
- }
- fromStackVersion = sch.getStackVersion();
- }
-
- if (processingUpgradeRequest) {
- if (!upgradeRequest) {
- throw new AmbariException("An upgrade request cannot be combined with " +
- "other non-upgrade requests.");
- }
- sch.setState(State.UPGRADING);
- sch.setDesiredStackVersion(cluster.getCurrentStackVersion());
- }
-
State oldSchState = sch.getState();
// Client component reinstall allowed
if (newState == oldSchState && !sc.isClientComponent()) {
@@ -1613,20 +1598,6 @@ public class AmbariManagementControllerImpl implements
}
if (isDirectTransition(oldSchState, newState)) {
-
-// if (newState == State.DELETED) {
-// if (!sch.canBeRemoved()) {
-// throw new AmbariException("Servicecomponenthost cannot be removed"
-// + ", clusterName=" + cluster.getClusterName()
-// + ", clusterId=" + cluster.getClusterId()
-// + ", serviceName=" + sch.getServiceName()
-// + ", componentName=" + sch.getServiceComponentName()
-// + ", hostname=" + sch.getHostName()
-// + ", currentState=" + oldSchState
-// + ", newDesiredState=" + newState);
-// }
-// }
-
if (LOG.isDebugEnabled()) {
LOG.debug("Handling direct transition update to ServiceComponentHost"
+ ", clusterName=" + request.getClusterName()
@@ -1698,13 +1669,8 @@ public class AmbariManagementControllerImpl implements
Cluster cluster = clusters.getCluster(clusterNames.iterator().next());
- Map<String, String> requestParameters = null;
- if (processingUpgradeRequest) {
- requestParameters = new HashMap<String, String>();
- requestParameters.put(Configuration.UPGRADE_TO_STACK, gson.toJson(cluster.getCurrentStackVersion()));
- requestParameters.put(Configuration.UPGRADE_FROM_STACK, gson.toJson(fromStackVersion));
- }
- return createStages(cluster, requestProperties, requestParameters, null, null, changedScHosts, ignoredScHosts, runSmokeTest, false);
+ return createStages(cluster, requestProperties, null, null, null, changedScHosts, ignoredScHosts, runSmokeTest,
+ false);
}
private void validateServiceComponentHostRequest(ServiceComponentHostRequest request) {
@@ -2187,7 +2153,7 @@ public class AmbariManagementControllerImpl implements
return null;
}
- private String getHealthyHost(Set<String> hostList) throws AmbariException {
+ protected String getHealthyHost(Set<String> hostList) throws AmbariException {
// Return a healthy host if found otherwise any random host
String hostName = null;
for (String candidateHostName : hostList) {
@@ -2200,149 +2166,11 @@ public class AmbariManagementControllerImpl implements
return hostName;
}
- private void addServiceCheckAction(ExecuteActionRequest actionRequest, Stage stage)
- throws AmbariException {
- String clusterName = actionRequest.getClusterName();
- String componentName = actionMetadata.getClient(actionRequest
- .getServiceName());
-
- String hostName;
- if (componentName != null) {
- Map<String, ServiceComponentHost> components = clusters
- .getCluster(clusterName).getService(actionRequest.getServiceName())
- .getServiceComponent(componentName).getServiceComponentHosts();
-
- if (components.isEmpty()) {
- throw new AmbariException("Hosts not found, component="
- + componentName + ", service=" + actionRequest.getServiceName()
- + ", cluster=" + clusterName);
- }
- hostName = getHealthyHost(components.keySet());
- } else {
- Map<String, ServiceComponent> components = clusters
- .getCluster(clusterName).getService(actionRequest.getServiceName())
- .getServiceComponents();
-
- if (components.isEmpty()) {
- throw new AmbariException("Components not found, service="
- + actionRequest.getServiceName() + ", cluster=" + clusterName);
- }
-
- ServiceComponent serviceComponent = components.values().iterator()
- .next();
-
- if (serviceComponent.getServiceComponentHosts().isEmpty()) {
- throw new AmbariException("Hosts not found, component="
- + serviceComponent.getName() + ", service="
- + actionRequest.getServiceName() + ", cluster=" + clusterName);
- }
-
- hostName = serviceComponent.getServiceComponentHosts().keySet()
- .iterator().next();
- }
-
- stage.addHostRoleExecutionCommand(hostName, Role.valueOf(actionRequest
- .getCommandName()), RoleCommand.EXECUTE,
- new ServiceComponentHostOpInProgressEvent(componentName, hostName,
- System.currentTimeMillis()), clusterName, actionRequest
- .getServiceName());
-
- stage.getExecutionCommandWrapper(hostName, actionRequest.getCommandName()).getExecutionCommand()
- .setRoleParams(actionRequest.getParameters());
-
- Cluster cluster = clusters.getCluster(clusterName);
-
- // [ type -> [ key, value ] ]
- Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String,String>>();
- Map<String, Map<String, String>> configTags =
- findConfigurationTagsWithOverrides(cluster, hostName);
-
- ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
- actionRequest.getCommandName()).getExecutionCommand();
-
- execCmd.setConfigurations(configurations);
- execCmd.setConfigurationTags(configTags);
-
- Map<String, String> params = new TreeMap<String, String>();
- params.put("jdk_location", this.jdkResourceUrl);
- params.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
- execCmd.setHostLevelParams(params);
- }
-
- private void addDecommissionDatanodeAction(
- ExecuteActionRequest decommissionRequest, Stage stage)
- throws AmbariException {
- String hdfsExcludeFileType = "hdfs-exclude-file";
- // Find hdfs admin host, just decommission from namenode.
- String clusterName = decommissionRequest.getClusterName();
- Cluster cluster = clusters.getCluster(clusterName);
- String serviceName = decommissionRequest.getServiceName();
- String namenodeHost = clusters.getCluster(clusterName)
- .getService(serviceName).getServiceComponent(Role.NAMENODE.toString())
- .getServiceComponentHosts().keySet().iterator().next();
-
- String excludeFileTag = null;
- if (decommissionRequest.getParameters() != null
- && (decommissionRequest.getParameters().get("excludeFileTag") != null)) {
- excludeFileTag = decommissionRequest.getParameters()
- .get("excludeFileTag");
- }
-
- if (excludeFileTag == null) {
- throw new IllegalArgumentException("No exclude file specified"
- + " when decommissioning datanodes. Provide parameter excludeFileTag with the tag for config type "
- + hdfsExcludeFileType);
- }
-
- Config config = clusters.getCluster(clusterName).getConfig(
- hdfsExcludeFileType, excludeFileTag);
- if(config == null){
- throw new AmbariException("Decommissioning datanodes requires the cluster to be associated with config type " +
- hdfsExcludeFileType + " with a list of datanodes to be decommissioned (\"datanodes\" : list).");
- }
-
- LOG.info("Decommissioning data nodes: " + config.getProperties().get("datanodes") +
- " " + hdfsExcludeFileType + " tag: " + excludeFileTag);
-
- Map<String, Map<String, String>> configurations =
- new TreeMap<String, Map<String, String>>();
-
-
- Map<String, Map<String, String>> configTags =
- findConfigurationTagsWithOverrides(cluster, namenodeHost);
-
- // Add the tag for hdfs-exclude-file
- Map<String, String> excludeTags = new HashMap<String, String>();
- excludeTags.put(ConfigHelper.CLUSTER_DEFAULT_TAG, config.getVersionTag());
- configTags.put(hdfsExcludeFileType, excludeTags);
-
- stage.addHostRoleExecutionCommand(
- namenodeHost,
- Role.DECOMMISSION_DATANODE,
- RoleCommand.EXECUTE,
- new ServiceComponentHostOpInProgressEvent(Role.DECOMMISSION_DATANODE
- .toString(), namenodeHost, System.currentTimeMillis()),
- clusterName, serviceName);
-
- ExecutionCommand execCmd = stage.getExecutionCommandWrapper(namenodeHost,
- Role.DECOMMISSION_DATANODE.toString()).getExecutionCommand();
-
- execCmd.setConfigurations(configurations);
- execCmd.setConfigurationTags(configTags);
-
- Map<String, String> params = new TreeMap<String, String>();
- params.put("jdk_location", this.jdkResourceUrl);
- params.put("stack_version", cluster.getDesiredStackVersion()
- .getStackVersion());
- execCmd.setHostLevelParams(params);
-
- }
-
@Override
public RequestStatusResponse createAction(ExecuteActionRequest actionRequest, Map<String, String> requestProperties)
throws AmbariException {
- String clusterName = null;
-
+ String clusterName;
+ Configuration configuration = injector.getInstance(Configuration.class);
String requestContext = "";
if (requestProperties != null) {
@@ -2353,48 +2181,38 @@ public class AmbariManagementControllerImpl implements
}
}
- String logDir = ""; //TODO empty for now
-
if (actionRequest.getClusterName() == null
- || actionRequest.getClusterName().isEmpty()
- || actionRequest.getServiceName() == null
- || actionRequest.getServiceName().isEmpty()
- || actionRequest.getCommandName() == null
- || actionRequest.getCommandName().isEmpty()) {
- throw new AmbariException("Invalid action request : " + "cluster="
- + actionRequest.getClusterName() + ", service="
- + actionRequest.getServiceName() + ", command="
- + actionRequest.getCommandName());
+ || actionRequest.getClusterName().isEmpty()) {
+ throw new AmbariException("Invalid request, cluster name must be specified");
}
-
clusterName = actionRequest.getClusterName();
-
+
Cluster cluster = clusters.getCluster(clusterName);
-
- Map<String, List<String>> clusterHostInfoMap = StageUtils.getClusterHostInfo(clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector);
- String clusterHostInfo = StageUtils.getGson().toJson(clusterHostInfoMap);
-
- Stage stage = stageFactory.createNew(actionManager.getNextRequestId(),
- logDir, clusterName, requestContext, clusterHostInfo);
+ ActionExecutionContext actionExecContext = null;
+ if (actionRequest.isCommand()) {
+ customCommandExecutionHelper.validateCustomCommand(actionRequest);
+ } else {
+ actionExecContext = actionExecutionHelper.validateCustomAction(actionRequest, cluster);
+ }
+
+ Map<String, List<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
+ clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap,
+ injector.getInstance(Configuration.class));
+
+ String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);
+ Stage stage = createNewStage(cluster, actionManager.getNextRequestId(), requestContext, clusterHostInfoJson);
stage.setStageId(0);
- LOG.info("Received a createAction request"
- + ", clusterName=" + actionRequest.getClusterName()
- + ", serviceName=" + actionRequest.getServiceName()
- + ", request=" + actionRequest.toString());
- if (!isValidCommand(actionRequest.getCommandName(), actionRequest.getServiceName())) {
- throw new AmbariException(
- "Unsupported action " + actionRequest.getCommandName() + " for " + actionRequest.getServiceName());
- }
+ Map<String, String> params = new TreeMap<String, String>();
+ params.put("jdk_location", this.jdkResourceUrl);
+ params.put("stack_version", cluster.getDesiredStackVersion().getStackVersion());
- if (actionRequest.getCommandName().contains("SERVICE_CHECK")) {
- addServiceCheckAction(actionRequest, stage);
- } else if (actionRequest.getCommandName().equals("DECOMMISSION_DATANODE")) {
- addDecommissionDatanodeAction(actionRequest, stage);
+ if (actionRequest.isCommand()) {
+ customCommandExecutionHelper.addAction(actionRequest, stage, configuration, hostsMap, params);
} else {
- throw new AmbariException("Unsupported action " + actionRequest.getCommandName());
+ actionExecutionHelper.addAction(actionExecContext, stage, configuration, hostsMap, params);
}
RoleCommandOrder rco = this.getRoleCommandOrder(cluster);
@@ -2402,7 +2220,7 @@ public class AmbariManagementControllerImpl implements
rg.build(stage);
List<Stage> stages = rg.getStages();
if (stages != null && !stages.isEmpty()) {
- actionManager.sendActions(stages);
+ actionManager.sendActions(stages, actionRequest);
return getRequestStatusResponse(stage.getRequestId());
} else {
throw new AmbariException("Stage was not created");
@@ -2428,18 +2246,6 @@ public class AmbariManagementControllerImpl implements
}
- private Boolean isValidCommand(String command, String service) {
- List<String> actions = actionMetadata.getActions(service);
- if (actions == null || actions.size() == 0) {
- return false;
- }
-
- if (!actions.contains(command)) {
- return false;
- }
-
- return true;
- }
private Set<StackResponse> getStacks(StackRequest request)
throws AmbariException {
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
index fd1cd1f..f8dd908 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ExecuteActionRequest.java
@@ -17,6 +17,10 @@
*/
package org.apache.ambari.server.controller;
+import org.apache.ambari.server.utils.StageUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,26 +37,29 @@ public class ExecuteActionRequest {
private Map<String, String> parameters;
public ExecuteActionRequest(String clusterName, String commandName,
- String actionName, String serviceName, String componentName,
- List<String> hosts, Map<String, String> parameters) {
- this.clusterName = clusterName;
- this.commandName = commandName;
+ String actionName, String serviceName, String componentName,
+ List<String> hosts, Map<String, String> parameters) {
+ this(clusterName, commandName, serviceName, parameters);
this.actionName = actionName;
- this.serviceName = serviceName;
this.componentName = componentName;
- this.parameters = parameters;
- this.hosts = hosts;
+ if (hosts != null) {
+ this.hosts.addAll(hosts);
+ }
}
/**
* Create an ExecuteActionRequest to execute a command
*/
public ExecuteActionRequest(String clusterName, String commandName, String serviceName,
- Map<String, String> parameters) {
+ Map<String, String> parameters) {
this.clusterName = clusterName;
this.commandName = commandName;
this.serviceName = serviceName;
- this.parameters = parameters;
+ this.parameters = new HashMap<String, String>();
+ if (parameters != null) {
+ this.parameters.putAll(parameters);
+ }
+ this.hosts = new ArrayList<String>();
}
public String getClusterName() {
@@ -86,4 +93,17 @@ public class ExecuteActionRequest {
public Boolean isCommand() {
return actionName == null || actionName.isEmpty();
}
+
+ @Override
+ public synchronized String toString() {
+ return (new StringBuilder()).
+ append("isCommand :" + isCommand().toString()).
+ append(", action :" + actionName).
+ append(", command :" + commandName).
+ append(", inputs :" + parameters.toString()).
+ append(", targetService :" + serviceName).
+ append(", targetComponent :" + componentName).
+ append(", targetHosts :" + hosts.toString()).
+ append(", clusterName :" + clusterName).toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index f77c8a4..5678887 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -129,14 +129,14 @@ public class HostRoleCommandDAO {
}
@Transactional
- public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, Role role) {
+ public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, String role) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT command " +
"FROM HostRoleCommandEntity command " +
"WHERE command.hostName=?1 AND command.requestId=?2 " +
"AND command.stageId=?3 AND command.role=?4 " +
"ORDER BY command.taskId", HostRoleCommandEntity.class);
- return daoUtils.selectList(query, hostName, requestId, stageId, role.name());
+ return daoUtils.selectList(query, hostName, requestId, stageId, role);
}
@Transactional
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index 8271151..79c001a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -77,10 +77,6 @@ public class StageDAO {
@Transactional
public List<StageEntity> findByCommandStatuses(Collection<HostRoleStatus> statuses) {
-// TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage " +
-// "FROM StageEntity stage JOIN stage.hostRoleCommands command " +
-// "WHERE command.status IN ?1 " +
-// "ORDER BY stage.requestId, stage.stageId", StageEntity.class);
TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage " +
"FROM StageEntity stage WHERE stage.stageId IN (SELECT hrce.stageId FROM " +
"HostRoleCommandEntity hrce WHERE stage.requestId = hrce.requestId and hrce.status IN ?1 ) " +
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
index 9e23516..7f1d031 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionEntity.java
@@ -21,7 +21,15 @@ package org.apache.ambari.server.orm.entities;
import org.apache.ambari.server.actionmanager.ActionType;
import org.apache.ambari.server.actionmanager.TargetHostType;
-import javax.persistence.*;
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
@NamedQueries({
@NamedQuery(name = "allActions", query =
@@ -52,11 +60,11 @@ public class ActionEntity {
@Basic
private String targetComponent;
- @Column(name = "description")
+ @Column(name = "description", nullable = false)
@Basic
private String description = "";
- @Column(name = "target_type")
+ @Column(name = "target_type", nullable = false)
@Enumerated(EnumType.STRING)
private TargetHostType targetType = TargetHostType.ANY;
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/22f5fdfb/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index b74a685..b922293 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -363,9 +363,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
State.INSTALLED,
ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
new ServiceComponentHostOpCompletedTransition())
-
-
.addTransition(State.INSTALLING,
State.INSTALLING,
ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,