You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2014/01/13 22:44:46 UTC
[3/3] git commit: AMBARI-4267. Enable BatchRequest(s) to transform to
API calls to the server. (mpapirkovskyy)
AMBARI-4267. Enable BatchRequest(s) to transform to API calls to the server. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c8697655
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c8697655
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c8697655
Branch: refs/heads/trunk
Commit: c86976551f4dd1a437bed6e728ae39bbc2040736
Parents: 46d86dc
Author: Myroslav Papirkovskyy <mp...@hortonworks.com>
Authored: Mon Jan 13 23:44:11 2014 +0200
Committer: Myroslav Papirkovskyy <mp...@hortonworks.com>
Committed: Mon Jan 13 23:44:11 2014 +0200
----------------------------------------------------------------------
.../server/actionmanager/ActionDBAccessor.java | 21 +-
.../actionmanager/ActionDBAccessorImpl.java | 155 ++++++----
.../actionmanager/ActionDBInMemoryImpl.java | 232 --------------
.../server/actionmanager/ActionManager.java | 28 +-
.../ambari/server/actionmanager/Request.java | 293 ++++++++++++++++++
.../server/actionmanager/RequestFactory.java | 38 +++
.../server/actionmanager/RequestType.java | 25 ++
.../ambari/server/actionmanager/Stage.java | 42 +--
.../server/actionmanager/StageFactory.java | 2 -
.../AmbariManagementControllerImpl.java | 25 +-
.../ambari/server/controller/AmbariServer.java | 52 +++-
.../server/controller/ControllerModule.java | 19 +-
.../internal/RequestResourceProvider.java | 101 ++++--
.../server/orm/dao/HostRoleCommandDAO.java | 11 +
.../ambari/server/orm/dao/RequestDAO.java | 68 ++++
.../server/orm/entities/ClusterEntity.java | 11 +
.../server/orm/entities/RequestEntity.java | 224 ++++++++++++++
.../RequestScheduleBatchRequestEntity.java | 8 +-
.../ambari/server/orm/entities/StageEntity.java | 33 +-
.../scheduler/AbstractLinearExecutionJob.java | 2 +
.../scheduler/ExecutionScheduleManager.java | 217 +++++++++++--
.../scheduler/ExecutionSchedulerImpl.java | 5 +
.../AmbariInternalAuthenticationProvider.java | 52 ++++
.../internal/InternalAuthenticationToken.java | 78 +++++
.../InternalTokenAuthenticationFilter.java | 60 ++++
.../internal/InternalTokenClientFilter.java | 41 +++
.../internal/InternalTokenStorage.java | 52 ++++
.../server/state/scheduler/BatchRequestJob.java | 46 ++-
.../state/scheduler/BatchRequestResponse.java | 38 +++
.../server/state/scheduler/GuiceJobFactory.java | 42 +++
.../state/scheduler/RequestExecution.java | 13 +
.../state/scheduler/RequestExecutionImpl.java | 44 +++
.../main/resources/Ambari-DDL-MySQL-CREATE.sql | 7 +-
.../main/resources/Ambari-DDL-Oracle-CREATE.sql | 5 +-
.../resources/Ambari-DDL-Postgres-CREATE.sql | 98 +++---
.../Ambari-DDL-Postgres-REMOTE-CREATE.sql | 7 +-
.../src/main/resources/META-INF/persistence.xml | 3 +
.../src/main/resources/properties.json | 8 +
.../webapp/WEB-INF/spring-security.xml | 5 +-
.../ExecutionCommandWrapperTest.java | 3 +-
.../actionmanager/TestActionDBAccessorImpl.java | 32 +-
.../server/actionmanager/TestActionManager.java | 9 +-
.../actionmanager/TestActionScheduler.java | 307 +++++++++++++++++--
.../ambari/server/agent/AgentResourceTest.java | 4 +
.../server/agent/TestHeartbeatHandler.java | 18 +-
.../AmbariManagementControllerTest.java | 23 +-
.../internal/RequestResourceProviderTest.java | 144 +++++----
.../apache/ambari/server/orm/OrmTestHelper.java | 13 +-
.../apache/ambari/server/orm/TestOrmImpl.java | 33 +-
.../scheduler/ExecutionScheduleManagerTest.java | 166 +++++++++-
.../state/scheduler/BatchRequestJobTest.java | 78 +++++
51 files changed, 2405 insertions(+), 636 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 11605bb..ac36bc6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -17,6 +17,8 @@
*/
package org.apache.ambari.server.actionmanager;
+import com.google.inject.persist.Transactional;
+import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.controller.ExecuteActionRequest;
@@ -55,10 +57,16 @@ public interface ActionDBAccessor {
/**
* Persists all tasks for a given request
- *
- * @param stages Stages belonging to the request
+ * @param request request object
*/
- public void persistActions(List<Stage> stages);
+ @Transactional
+ void persistActions(Request request);
+
+ @Transactional
+ void startRequest(long requestId);
+
+ @Transactional
+ void endRequest(long requestId);
/**
* For the given host, update all the tasks based on the command report
@@ -114,7 +122,7 @@ public interface ActionDBAccessor {
/**
* Get all requests
*/
- public List<Long> getRequests();
+ public List<Long> getRequestIds();
/**
* Gets the host role command corresponding to the task id
@@ -135,4 +143,9 @@ public interface ActionDBAccessor {
* Gets the request context associated with the request id
*/
public String getRequestContext(long requestId);
+
+ /**
+ * Gets request objects by ids
+ */
+ List<Request> getRequests(Collection<Long> requestIds);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 3fcf4f4..a75848c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -27,63 +27,50 @@ import com.google.inject.persist.Transactional;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.controller.ExecuteActionRequest;
-import org.apache.ambari.server.orm.dao.ActionDefinitionDAO;
-import org.apache.ambari.server.orm.dao.ClusterDAO;
-import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
-import org.apache.ambari.server.orm.dao.HostDAO;
-import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
-import org.apache.ambari.server.orm.dao.RoleSuccessCriteriaDAO;
-import org.apache.ambari.server.orm.dao.StageDAO;
-import org.apache.ambari.server.orm.entities.ClusterEntity;
-import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
-import org.apache.ambari.server.orm.entities.HostEntity;
-import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
-import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
-import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.orm.dao.*;
+import org.apache.ambari.server.orm.entities.*;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.TimeUnit;
@Singleton
public class ActionDBAccessorImpl implements ActionDBAccessor {
private static final Logger LOG = LoggerFactory.getLogger(ActionDBAccessorImpl.class);
- private final long requestId;
+ private long requestId;
@Inject
- private ClusterDAO clusterDAO;
+ ClusterDAO clusterDAO;
@Inject
- private HostDAO hostDAO;
+ HostDAO hostDAO;
@Inject
- private StageDAO stageDAO;
+ RequestDAO requestDAO;
@Inject
- private HostRoleCommandDAO hostRoleCommandDAO;
+ StageDAO stageDAO;
@Inject
- private ExecutionCommandDAO executionCommandDAO;
+ HostRoleCommandDAO hostRoleCommandDAO;
@Inject
- private RoleSuccessCriteriaDAO roleSuccessCriteriaDAO;
+ ExecutionCommandDAO executionCommandDAO;
@Inject
- private StageFactory stageFactory;
+ RoleSuccessCriteriaDAO roleSuccessCriteriaDAO;
@Inject
- private HostRoleCommandFactory hostRoleCommandFactory;
+ StageFactory stageFactory;
@Inject
- private Clusters clusters;
+ RequestFactory requestFactory;
+ @Inject
+ HostRoleCommandFactory hostRoleCommandFactory;
+ @Inject
+ Clusters clusters;
+
+
+
private Cache<Long, HostRoleCommand> hostRoleCommandCache;
private long cacheLimit; //may be exceeded to store tasks from one request
@Inject
- public ActionDBAccessorImpl(Injector injector, @Named("executionCommandCacheSize") long cacheLimit) {
- injector.injectMembers(this);
- requestId = stageDAO.getLastRequestId();
+ public ActionDBAccessorImpl(@Named("executionCommandCacheSize") long cacheLimit) {
this.cacheLimit = cacheLimit;
hostRoleCommandCache = CacheBuilder.newBuilder().
@@ -92,12 +79,18 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
}
+ @Inject
+ void init() {
+ requestId = stageDAO.getLastRequestId();
+ }
+
/* (non-Javadoc)
* @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getStage(java.lang.String)
*/
@Override
public Stage getStage(String actionId) {
- return stageFactory.createExisting(actionId);
+ StageEntity stageEntity = stageDAO.findByActionId(actionId);
+ return stageFactory.createExisting(stageEntity);
}
/* (non-Javadoc)
@@ -116,8 +109,17 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
* @see org.apache.ambari.server.actionmanager.ActionDBAccessor#abortOperation(long)
*/
@Override
+ @Transactional
public void abortOperation(long requestId) {
long now = System.currentTimeMillis();
+
+ //mark request as ended
+ RequestEntity requestEntity = requestDAO.findByPK(requestId);
+ if (requestEntity != null && requestEntity.getEndTime() == -1L) {
+ requestEntity.setEndTime(now);
+ requestDAO.merge(requestEntity);
+ }
+
List<HostRoleCommandEntity> commands =
hostRoleCommandDAO.findByRequest(requestId);
for (HostRoleCommandEntity command : commands) {
@@ -168,32 +170,36 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
return stages;
}
- /* (non-Javadoc)
- * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#persistActions(java.util.List)
- */
@Override
@Transactional
- public void persistActions(List<Stage> stages) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding stages to DB, stageCount=" + stages.size());
+ public void persistActions(Request request) {
+
+ RequestEntity requestEntity = request.constructNewPersistenceEntity();
+
+ ClusterEntity clusterEntity = clusterDAO.findById(request.getClusterId());
+ if (clusterEntity == null) {
+ throw new RuntimeException(String.format("Cluster with id=%s not found", request.getClusterId()));
}
+ requestEntity.setCluster(clusterEntity);
+ requestDAO.create(requestEntity);
- for (Stage stage : stages) {
- StageEntity stageEntity = stage.constructNewPersistenceEntity();
- Cluster cluster;
- try {
- cluster = clusters.getCluster(stage.getClusterName());
- } catch (AmbariException e) {
- throw new RuntimeException(e);
- }
- ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId());
+ //TODO wire request to cluster
+ List<StageEntity> stageEntities = new ArrayList<StageEntity>(request.getStages().size());
+ for (Stage stage : request.getStages()) {
+ StageEntity stageEntity = stage.constructNewPersistenceEntity();
+ stageEntities.add(stageEntity);
stageEntity.setCluster(clusterEntity);
+ //TODO refactor to reduce merges
+ stageEntity.setRequest(requestEntity);
stageDAO.create(stageEntity);
- for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
- HostRoleCommandEntity hostRoleCommandEntity = hostRoleCommand.constructNewPersistenceEntity();
+ List<HostRoleCommand> orderedHostRoleCommands = stage.getOrderedHostRoleCommands();
+ List<HostRoleCommandEntity> hostRoleCommandEntities = new ArrayList<HostRoleCommandEntity>();
+ for (HostRoleCommand hostRoleCommand : orderedHostRoleCommands) {
+ HostRoleCommandEntity hostRoleCommandEntity = hostRoleCommand.constructNewPersistenceEntity();
+ hostRoleCommandEntities.add(hostRoleCommandEntity);
hostRoleCommandEntity.setStage(stageEntity);
HostEntity hostEntity = hostDAO.findByName(hostRoleCommandEntity.getHostName());
@@ -221,7 +227,32 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
roleSuccessCriteriaDAO.create(roleSuccessCriteriaEntity);
}
+
+ stageDAO.create(stageEntity);
}
+ requestEntity.setStages(stageEntities);
+ requestDAO.merge(requestEntity);
+// requestDAO.create(requestEntity);
+ }
+
+ @Override
+ @Transactional
+ public void startRequest(long requestId) {
+ RequestEntity requestEntity = requestDAO.findByPK(requestId);
+ if (requestEntity != null && requestEntity.getStartTime() == -1L) {
+ requestEntity.setStartTime(System.currentTimeMillis());
+ }
+ requestDAO.merge(requestEntity);
+ }
+
+ @Override
+ @Transactional
+ public void endRequest(long requestId) {
+ RequestEntity requestEntity = requestDAO.findByPK(requestId);
+ if (requestEntity != null && requestEntity.getEndTime() == -1L) {
+ requestEntity.setEndTime(System.currentTimeMillis());
+ }
+ requestDAO.merge(requestEntity);
}
@Override
@@ -349,7 +380,12 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
}
}
}
-
+ Collections.sort(commands, new Comparator<HostRoleCommand>() {
+ @Override
+ public int compare(HostRoleCommand o1, HostRoleCommand o2) {
+ return (int) (o1.getTaskId()-o2.getTaskId());
+ }
+ });
return commands;
}
@@ -363,7 +399,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
}
@Override
- public List<Long> getRequests() {
+ public List<Long> getRequestIds() {
return hostRoleCommandDAO.getRequests();
}
@@ -406,4 +442,15 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
public String getRequestContext(long requestId) {
return stageDAO.findRequestContext(requestId);
}
+
+ @Override
+ @Transactional
+ public List<Request> getRequests(Collection<Long> requestIds){
+ List<RequestEntity> requestEntities = requestDAO.findByPks(requestIds);
+ List<Request> requests = new ArrayList<Request>(requestEntities.size());
+ for (RequestEntity requestEntity : requestEntities) {
+ requests.add(requestFactory.createExisting(requestEntity));
+ }
+ return requests;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
deleted file mode 100644
index 8c36366..0000000
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.actionmanager;
-
-import com.google.inject.Singleton;
-import org.apache.ambari.server.agent.CommandReport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-@Singleton
-public class ActionDBInMemoryImpl implements ActionDBAccessor {
-
- private static Logger LOG = LoggerFactory.getLogger(ActionDBInMemoryImpl.class);
- // for a persisted DB, this will be initialized in the ctor
- // with the highest persisted requestId value in the DB
- private final long lastRequestId = 0;
- List<Stage> stageList = new ArrayList<Stage>();
-
- @Override
- public synchronized Stage getStage(String actionId) {
- for (Stage s : stageList) {
- if (s.getActionId().equals(actionId)) {
- return s;
- }
- }
- return null;
- }
-
- @Override
- public synchronized List<Stage> getAllStages(long requestId) {
- List<Stage> l = new ArrayList<Stage>();
- for (Stage s : stageList) {
- if (s.getRequestId() == requestId) {
- l.add(s);
- }
- }
- return l;
- }
-
- @Override
- public synchronized void abortOperation(long requestId) {
- for (Stage s : stageList) {
- if (s.getRequestId() == requestId) {
- for (String host : s.getHostRoleCommands().keySet()) {
- Map<String, HostRoleCommand> roleCommands = s.getHostRoleCommands().get(host);
- for (String role : roleCommands.keySet()) {
- HostRoleCommand cmd = roleCommands.get(role);
- HostRoleStatus status = s.getHostRoleStatus(host, cmd.getRole()
- .toString());
- if (status.equals(HostRoleStatus.IN_PROGRESS)
- || status.equals(HostRoleStatus.QUEUED)
- || status.equals(HostRoleStatus.PENDING)) {
- s.setHostRoleStatus(host, cmd.getRole().toString(),
- HostRoleStatus.ABORTED);
- }
- }
- }
- }
- }
- }
-
- @Override
- public synchronized void timeoutHostRole(String host, long requestId,
- long stageId, String role) {
- for (Stage s : stageList) {
- s.setHostRoleStatus(host, role.toString(), HostRoleStatus.TIMEDOUT);
- }
- }
-
- @Override
- public synchronized List<Stage> getStagesInProgress() {
- List<Stage> l = new ArrayList<Stage>();
- for (Stage s : stageList) {
- if (s.isStageInProgress()) {
- l.add(s);
- }
- }
- return l;
- }
-
- @Override
- public synchronized void persistActions(List<Stage> stages) {
- for (Stage s : stages) {
- stageList.add(s);
- }
- }
-
- @Override
- public synchronized void updateHostRoleState(String hostname, long requestId,
- long stageId, String role, CommandReport report) {
- LOG.info("DEBUG stages to iterate: " + stageList.size());
- if (null == report.getStatus()
- || null == report.getStdOut()
- || null == report.getStdErr()) {
- throw new RuntimeException("Badly formed command report.");
- }
- for (Stage s : stageList) {
- if (s.getRequestId() == requestId && s.getStageId() == stageId) {
- s.setHostRoleStatus(hostname, role,
- HostRoleStatus.valueOf(report.getStatus()));
- s.setExitCode(hostname, role, report.getExitCode());
- s.setStderr(hostname, role, report.getStdErr());
- s.setStdout(hostname, role, report.getStdOut());
- }
- }
- }
-
- @Override
- public void abortHostRole(String host, long requestId, long stageId, String role) {
- CommandReport report = new CommandReport();
- report.setExitCode(999);
- report.setStdErr("Host Role in invalid state");
- report.setStdOut("");
- report.setStatus("ABORTED");
- updateHostRoleState(host, requestId, stageId, role, report);
- }
-
- @Override
- public synchronized long getLastPersistedRequestIdWhenInitialized() {
- return lastRequestId;
- }
-
- @Override
- public void hostRoleScheduled(Stage s, String hostname, String roleStr) {
- //Nothing needed for in-memory implementation
- }
-
- @Override
- public List<HostRoleCommand> getRequestTasks(long requestId) {
- return null;
- }
-
- @Override
- public List<HostRoleCommand> getAllTasksByRequestIds(Collection<Long> requestIds) {
- //TODO not implemented
- return null;
- }
-
- @Override
- public List<HostRoleCommand> getTasksByRequestAndTaskIds(Collection<Long> requestIds, Collection<Long> taskIds) {
- //TODO not implemented
- return null;
- }
-
- @Override
- public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds) {
- return null;
- }
-
- @Override
- public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) {
- List<Stage> l = new ArrayList<Stage>();
- for (Stage s : stageList) {
- if (s.doesStageHaveHostRoleStatus(statuses)) {
- l.add(s);
- }
- }
- return l;
- }
-
- @Override
- public synchronized List<Long> getRequests() {
- Set<Long> requestIds = new HashSet<Long>();
- for (Stage s : stageList) {
- requestIds.add(s.getRequestId());
- }
- List<Long> ids = new ArrayList<Long>();
- ids.addAll(requestIds);
- return ids;
- }
-
- public HostRoleCommand getTask(long taskId) {
- for (Stage s : stageList) {
- for (String host : s.getHostRoleCommands().keySet()) {
- Map<String, HostRoleCommand> map = s.getHostRoleCommands().get(host);
- for (HostRoleCommand hostRoleCommand : map.values()) {
- if (hostRoleCommand.getTaskId() == taskId) {
- return hostRoleCommand;
- }
- }
- }
- }
- return null;
- }
-
- @Override
- public List<Long> getRequestsByStatus(RequestStatus status) {
- // TODO
- throw new RuntimeException("Functionality not implemented");
- }
-
- @Override
- public Map<Long, String> getRequestContext(List<Long> requestIds) {
- Map<Long, String> result = new HashMap<Long, String>();
- for (Long requestId : requestIds) {
- List<Stage> stages = getAllStages(requestId);
- result.put(requestId, stages != null && !stages.isEmpty() ? stages.get
- (0).getRequestContext() : "");
- }
- return result;
- }
-
- @Override
- public String getRequestContext(long requestId) {
- List<Stage> stages = getAllStages(requestId);
- return stages != null && !stages.isEmpty() ? stages.get(0)
- .getRequestContext() : "";
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
index aa553c4..af25149 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
@@ -50,12 +50,15 @@ public class ActionManager {
private final ActionQueue actionQueue;
private final AtomicLong requestCounter;
private final CustomActionDBAccessor cdb;
+ private final RequestFactory requestFactory;
+
@Inject
public ActionManager(@Named("schedulerSleeptime") long schedulerSleepTime,
@Named("actionTimeout") long actionTimeout,
ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap,
- ServerActionManager serverActionManager, UnitOfWork unitOfWork, CustomActionDBAccessor cdb) {
+ ServerActionManager serverActionManager, UnitOfWork unitOfWork, CustomActionDBAccessor cdb,
+ RequestFactory requestFactory) {
this.actionQueue = aq;
this.db = db;
scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db,
@@ -63,6 +66,7 @@ public class ActionManager {
requestCounter = new AtomicLong(
db.getLastPersistedRequestIdWhenInitialized());
this.cdb = cdb;
+ this.requestFactory = requestFactory;
}
public void start() {
@@ -74,23 +78,27 @@ public class ActionManager {
scheduler.stop();
}
- public void sendActions(List<Stage> stages, ExecuteActionRequest request) {
+ public void sendActions(List<Stage> stages, ExecuteActionRequest actionRequest) throws AmbariException {
+ Request request = requestFactory.createNewFromStages(stages, actionRequest);
+ sendActions(request, actionRequest);
+ }
+ public void sendActions(Request request, ExecuteActionRequest executeActionRequest) {
if (LOG.isDebugEnabled()) {
- for (Stage s : stages) {
- LOG.debug("Persisting stage into db: " + s.toString());
- }
+ LOG.debug(String.format("Persisting Request into DB: %s", request));
- if (request != null) {
+ if (executeActionRequest != null) {
LOG.debug("In response to request: " + request.toString());
}
}
- db.persistActions(stages);
-
- // Now scheduler should process actions
+ db.persistActions(request);
scheduler.awake();
}
+ public List<Request> getRequests(Collection<Long> requestIds) {
+ return db.getRequests(requestIds);
+ }
+
public List<Stage> getRequestStatus(long requestId) {
return db.getAllStages(requestId);
}
@@ -179,7 +187,7 @@ public class ActionManager {
* @return
*/
public List<Long> getRequests() {
- return db.getRequests();
+ return db.getRequestIds();
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
new file mode 100644
index 0000000..35f6864
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.actionmanager;
+
+import com.google.gson.Gson;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
+import org.apache.ambari.server.state.Clusters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class Request {
+ private static final Logger LOG = LoggerFactory.getLogger(Request.class);
+
+ private final long requestId;
+ private final long clusterId;
+ private final String clusterName;
+ private String commandName;
+ private String requestContext;
+ private long createTime;
+ private long startTime;
+ private long endTime;
+ private String inputs;
+ private String targetService;
+ private String targetComponent;
+ private String targetHosts;
+ private RequestType requestType;
+
+ private Collection<Stage> stages = new ArrayList<Stage>();
+
+ @AssistedInject
+ /**
+ * Construct new entity
+ */
+ public Request(@Assisted long requestId, @Assisted("clusterId") Long clusterId, Clusters clusters) {
+ this.requestId = requestId;
+ this.clusterId = clusterId;
+ this.createTime = System.currentTimeMillis();
+ this.startTime = -1;
+ this.endTime = -1;
+ try {
+ this.clusterName = clusters.getClusterById(clusterId).getClusterName();
+ } catch (AmbariException e) {
+ String message = String.format("Cluster with id=%s not found", clusterId);
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ }
+
+ @AssistedInject
+ /**
+ * Construct new entity from stages provided
+ */
+ //TODO remove when not needed
+ public Request(@Assisted Collection<Stage> stages, Clusters clusters){
+ if (stages != null && !stages.isEmpty()) {
+ this.stages.addAll(stages);
+ Stage stage = stages.iterator().next();
+ this.requestId = stage.getRequestId();
+ this.clusterName = stage.getClusterName();
+ try {
+ this.clusterId = clusters.getCluster(clusterName).getClusterId();
+ } catch (AmbariException e) {
+ String message = String.format("Cluster %s not found", clusterName);
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ this.requestContext = stages.iterator().next().getRequestContext();
+ this.createTime = System.currentTimeMillis();
+ this.startTime = -1;
+ this.endTime = -1;
+ this.requestType = RequestType.INTERNAL_REQUEST;
+ } else {
+ String message = "Attempted to construct request from empty stage collection";
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ }
+
+ @AssistedInject
+ /**
+ * Construct new entity from stages provided
+ */
+ //TODO remove when not needed
+ public Request(@Assisted Collection<Stage> stages, @Assisted ExecuteActionRequest actionRequest,
+ Clusters clusters, Gson gson) throws AmbariException {
+ this(stages, clusters);
+ if (actionRequest != null) {
+ this.targetService = actionRequest.getServiceName();
+ this.targetComponent = actionRequest.getComponentName();
+ this.targetHosts = gson.toJson(actionRequest.getHosts());
+ this.inputs = gson.toJson(actionRequest.getParameters());
+ this.requestType = actionRequest.isCommand() ? RequestType.COMMAND : RequestType.ACTION;
+ this.commandName = actionRequest.isCommand() ? actionRequest.getCommandName() : actionRequest.getActionName();
+ }
+ }
+
+ @AssistedInject
+ /**
+ * Load existing request from database
+ */
+ public Request(@Assisted RequestEntity entity, StageFactory stageFactory){
+ if (entity == null) {
+ throw new RuntimeException("Request entity cannot be null.");
+ }
+
+ this.requestId = entity.getRequestId();
+ this.clusterId = entity.getCluster().getClusterId();
+ this.clusterName = entity.getCluster().getClusterName();
+ this.createTime = entity.getCreateTime();
+ this.startTime = entity.getStartTime();
+ this.endTime = entity.getEndTime();
+ this.requestContext = entity.getRequestContext();
+ this.inputs = entity.getInputs();
+ this.targetService = entity.getTargetService();
+ this.targetComponent = entity.getTargetComponent();
+ this.targetHosts = entity.getTargetHosts();
+ this.requestType = entity.getRequestType();
+ this.commandName = entity.getCommandName();
+
+ for (StageEntity stageEntity : entity.getStages()) {
+ Stage stage = stageFactory.createExisting(stageEntity);
+ stages.add(stage);
+ }
+ }
+
+ public Collection<Stage> getStages() {
+ return stages;
+ }
+
+ public void setStages(Collection<Stage> stages) {
+ this.stages = stages;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public synchronized RequestEntity constructNewPersistenceEntity() {
+ RequestEntity requestEntity = new RequestEntity();
+
+ requestEntity.setRequestId(requestId);
+ requestEntity.setClusterId(clusterId);
+ requestEntity.setCreateTime(createTime);
+ requestEntity.setStartTime(startTime);
+ requestEntity.setEndTime(endTime);
+ requestEntity.setRequestContext(requestContext);
+ requestEntity.setInputs(inputs);
+ requestEntity.setTargetService(targetService);
+ requestEntity.setTargetComponent(targetComponent);
+ requestEntity.setTargetHosts(targetHosts);
+ requestEntity.setRequestType(requestType);
+ //TODO set all fields
+
+ return requestEntity;
+ }
+
+
+ public Long getClusterId() {
+ return clusterId;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getRequestContext() {
+ return requestContext;
+ }
+
+ public void setRequestContext(String requestContext) {
+ this.requestContext = requestContext;
+ }
+
+ public long getCreateTime() {
+ return createTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ }
+
+ public String getInputs() {
+ return inputs;
+ }
+
+ public void setInputs(String inputs) {
+ this.inputs = inputs;
+ }
+
+ public String getTargetService() {
+ return targetService;
+ }
+
+ public void setTargetService(String targetService) {
+ this.targetService = targetService;
+ }
+
+ public String getTargetComponent() {
+ return targetComponent;
+ }
+
+ public void setTargetComponent(String targetComponent) {
+ this.targetComponent = targetComponent;
+ }
+
+ public String getTargetHosts() {
+ return targetHosts;
+ }
+
+ public void setTargetHosts(String targetHosts) {
+ this.targetHosts = targetHosts;
+ }
+
+ public RequestType getRequestType() {
+ return requestType;
+ }
+
+ public void setRequestType(RequestType requestType) {
+ this.requestType = requestType;
+ }
+
+ public String getCommandName() {
+ return commandName;
+ }
+
+ public void setCommandName(String commandName) {
+ this.commandName = commandName;
+ }
+
+ public List<HostRoleCommand> getCommands() {
+ List<HostRoleCommand> commands = new ArrayList<HostRoleCommand>();
+ for (Stage stage : stages) {
+ commands.addAll(stage.getOrderedHostRoleCommands());
+ }
+ return commands;
+ }
+
+ @Override
+ public String toString() {
+ return "Request{" +
+ "requestId=" + requestId +
+ ", clusterId=" + clusterId +
+ ", clusterName='" + clusterName + '\'' +
+ ", requestContext='" + requestContext + '\'' +
+ ", createTime=" + createTime +
+ ", startTime=" + startTime +
+ ", endTime=" + endTime +
+ ", inputs='" + inputs + '\'' +
+ ", targetService='" + targetService + '\'' +
+ ", targetComponent='" + targetComponent + '\'' +
+ ", targetHosts='" + targetHosts + '\'' +
+ ", requestType=" + requestType +
+ ", stages=" + stages +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/RequestFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/RequestFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/RequestFactory.java
new file mode 100644
index 0000000..3313582
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/RequestFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.actionmanager;
+
+import com.google.inject.assistedinject.Assisted;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.controller.ExecuteActionRequest;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+
+import java.util.Collection;
+
+public interface RequestFactory {
+
+ Request createNew(long requestId, @Assisted("clusterId") Long clusterName) throws AmbariException;
+
+ Request createNewFromStages(Collection<Stage> stages);
+
+ Request createNewFromStages(Collection<Stage> stages, ExecuteActionRequest actionRequest);
+
+ Request createExisting(RequestEntity entity);
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/RequestType.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/RequestType.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/RequestType.java
new file mode 100644
index 0000000..90b2c6a
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/RequestType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.actionmanager;
+
+public enum RequestType {
+ ACTION,
+ COMMAND,
+ INTERNAL_REQUEST
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index 264e5d7..3a9cc5c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -17,12 +17,7 @@
*/
package org.apache.ambari.server.actionmanager;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.*;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
@@ -83,20 +78,8 @@ public class Stage {
this.clusterHostInfo = clusterHostInfo;
}
- /**
- * Creates Stage existing in database
- * @param actionId "requestId-stageId" string
- */
- @AssistedInject
- public Stage(@Assisted String actionId, Injector injector) {
- this(injector.getInstance(StageDAO.class).findByActionId(actionId), injector);
- }
-
@AssistedInject
- public Stage(@Assisted StageEntity stageEntity, Injector injector) {
- HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
- HostDAO hostDAO = injector.getInstance(HostDAO.class);
- HostRoleCommandFactory hostRoleCommandFactory = injector.getInstance(HostRoleCommandFactory.class);
+ public Stage(@Assisted StageEntity stageEntity, HostRoleCommandDAO hostRoleCommandDAO, ActionDBAccessor dbAccessor) {
requestId = stageEntity.getRequestId();
stageId = stageEntity.getStageId();
@@ -106,19 +89,18 @@ public class Stage {
clusterHostInfo = stageEntity.getClusterHostInfo();
- Map<String, List<HostRoleCommandEntity>> hostCommands = hostRoleCommandDAO.findSortedCommandsByStage(stageEntity);
+ List<Long> taskIds = hostRoleCommandDAO.findTaskIdsByStage(requestId, stageId);
+ Collection<HostRoleCommand> commands = dbAccessor.getTasks(taskIds);
- for (Map.Entry<String, List<HostRoleCommandEntity>> entry : hostCommands.entrySet()) {
- String hostname = entry.getKey();
- commandsToSend.put(hostname, new ArrayList<ExecutionCommandWrapper>());
- hostRoleCommands.put(hostname, new TreeMap<String, HostRoleCommand>());
- for (HostRoleCommandEntity hostRoleCommandEntity : entry.getValue()) {
- HostRoleCommand hostRoleCommand = hostRoleCommandFactory.createExisting(hostRoleCommandEntity);
-
-
- hostRoleCommands.get(hostname).put(hostRoleCommand.getRole().toString(), hostRoleCommand);
- commandsToSend.get(hostname).add(hostRoleCommand.getExecutionCommandWrapper());
+ for (HostRoleCommand command : commands) {
+ String hostname = command.getHostName();
+ if (!hostRoleCommands.containsKey(hostname)) {
+ commandsToSend.put(hostname, new ArrayList<ExecutionCommandWrapper>());
+ hostRoleCommands.put(hostname, new TreeMap<String, HostRoleCommand>());
}
+
+ hostRoleCommands.get(hostname).put(command.getRole().toString(), command);
+ commandsToSend.get(hostname).add(command.getExecutionCommandWrapper());
}
for (RoleSuccessCriteriaEntity successCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java
index 3086072..c0e2e6e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java
@@ -26,7 +26,5 @@ public interface StageFactory {
Stage createNew(long requestId, @Assisted("logDir") String logDir, @Assisted("clusterName") String clusterName,
@Assisted("requestContext") String requestContext, @Assisted("clusterHostInfo") String clusterHostInfo);
- Stage createExisting(String actionId);
-
Stage createExisting(StageEntity stageEntity);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index ca14f14..afff8e4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -45,10 +45,7 @@ import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.ServiceNotFoundException;
import org.apache.ambari.server.StackAccessException;
-import org.apache.ambari.server.actionmanager.ActionManager;
-import org.apache.ambari.server.actionmanager.HostRoleCommand;
-import org.apache.ambari.server.actionmanager.Stage;
-import org.apache.ambari.server.actionmanager.StageFactory;
+import org.apache.ambari.server.actionmanager.*;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.internal.URLStreamProvider;
@@ -117,6 +114,8 @@ public class AmbariManagementControllerImpl implements
@Inject
private StageFactory stageFactory;
@Inject
+ private RequestFactory requestFactory;
+ @Inject
private ActionMetadata actionMetadata;
@Inject
private AmbariMetaInfo ambariMetaInfo;
@@ -1279,15 +1278,19 @@ public class AmbariManagementControllerImpl implements
return null;
}
- private void persistStages(List<Stage> stages) {
- if(stages != null && stages.size() > 0) {
+ private void persistStages(List<Stage> stages) throws AmbariException {
+ if (stages != null && !stages.isEmpty()) {
+ persistRequest(requestFactory.createNewFromStages(stages));
+ }
+ }
+
+ //TODO use when request created externally
+ private void persistRequest(Request request) {
+ if (request != null && request.getStages()!= null && !request.getStages().isEmpty()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Triggering Action Manager"
- + ", clusterName=" + stages.get(0).getClusterName()
- + ", requestId=" + stages.get(0).getRequestId()
- + ", stagesCount=" + stages.size());
+ LOG.debug(String.format("Triggering Action Manager, request=%s", request));
}
- actionManager.sendActions(stages, null);
+ actionManager.sendActions(request, null);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index 7ca3ea1..8cefa74 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -25,6 +25,7 @@ import java.util.Map;
import javax.crypto.BadPaddingException;
+import com.google.inject.name.Named;
import org.apache.ambari.eventdb.webservice.WorkflowJsonService;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.actionmanager.ActionManager;
@@ -41,21 +42,23 @@ import org.apache.ambari.server.bootstrap.BootStrapImpl;
import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
-import org.apache.ambari.server.controller.internal.ClusterControllerImpl;
import org.apache.ambari.server.controller.internal.StackDefinedPropertyProvider;
import org.apache.ambari.server.controller.nagios.NagiosPropertyProvider;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.PersistenceType;
import org.apache.ambari.server.orm.dao.MetainfoDAO;
+import org.apache.ambari.server.orm.entities.MetainfoEntity;
import org.apache.ambari.server.resources.ResourceManager;
import org.apache.ambari.server.resources.api.rest.GetResource;
import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
-import org.apache.ambari.server.scheduler.ExecutionScheduler;
import org.apache.ambari.server.security.CertificateManager;
import org.apache.ambari.server.security.SecurityFilter;
import org.apache.ambari.server.security.authorization.AmbariLdapAuthenticationProvider;
import org.apache.ambari.server.security.authorization.AmbariLocalUserDetailsService;
import org.apache.ambari.server.security.authorization.Users;
+import org.apache.ambari.server.security.authorization.internal.AmbariInternalAuthenticationProvider;
+import org.apache.ambari.server.security.authorization.internal.InternalTokenAuthenticationFilter;
+import org.apache.ambari.server.security.authorization.internal.InternalTokenStorage;
import org.apache.ambari.server.security.unsecured.rest.CertificateDownload;
import org.apache.ambari.server.security.unsecured.rest.CertificateSign;
import org.apache.ambari.server.state.Clusters;
@@ -110,6 +113,9 @@ public class AmbariServer {
AmbariMetaInfo ambariMetaInfo;
@Inject
MetainfoDAO metainfoDAO;
+ @Inject
+ @Named("dbInitNeeded")
+ boolean dbInitNeeded;
public String getServerOsType() {
return configs.getServerOsType();
@@ -127,7 +133,7 @@ public class AmbariServer {
LOG.info("********* Meta Info initialized **********");
performStaticInjection();
- addInMemoryUsers();
+ initDB();
server = new Server();
serverForAgent = new Server();
@@ -147,6 +153,11 @@ public class AmbariServer {
injector.getInstance(AmbariLocalUserDetailsService.class));
factory.registerSingleton("ambariLdapAuthenticationProvider",
injector.getInstance(AmbariLdapAuthenticationProvider.class));
+ factory.registerSingleton("internalTokenAuthenticationFilter",
+ injector.getInstance(InternalTokenAuthenticationFilter.class));
+ factory.registerSingleton("ambariInternalAuthenticationProvider",
+ injector.getInstance(AmbariInternalAuthenticationProvider.class));
+
//Spring Security xml config depends on this Bean
String[] contextLocations = {SPRING_CONTEXT_LOCATION};
@@ -393,9 +404,9 @@ public class AmbariServer {
* Creates default users and roles if in-memory database is used
*/
@Transactional
- protected void addInMemoryUsers() {
- if (configs.getPersistenceType() == PersistenceType.IN_MEMORY) {
- LOG.info("In-memory database is used - creating default users");
+ protected void initDB() {
+ if (configs.getPersistenceType() == PersistenceType.IN_MEMORY || dbInitNeeded) {
+ LOG.info("Database init needed - creating default data");
Users users = injector.getInstance(Users.class);
users.createDefaultRoles();
@@ -403,24 +414,37 @@ public class AmbariServer {
users.createUser("user", "user");
try {
users.promoteToAdmin(users.getLocalUser("admin"));
- } catch (AmbariException e) {
- throw new RuntimeException(e);
+ } catch (AmbariException ignored) {
}
+
+ MetainfoEntity schemaVersion = new MetainfoEntity();
+ schemaVersion.setMetainfoName(Configuration.SERVER_VERSION_KEY);
+ schemaVersion.setMetainfoValue(ambariMetaInfo.getServerVersion());
+
+ metainfoDAO.create(schemaVersion);
}
}
protected void checkDBVersion() throws AmbariException {
LOG.info("Checking DB store version");
- String schemaVersion = metainfoDAO.findByKey(Configuration.SERVER_VERSION_KEY).getMetainfoValue();
- String serverVersion = ambariMetaInfo.getServerVersion();
- if (! schemaVersion.equals(serverVersion)) {
+ MetainfoEntity schemaVersionEntity = metainfoDAO.findByKey(Configuration.SERVER_VERSION_KEY);
+ String schemaVersion = null;
+ String serverVersion = null;
+
+ if (schemaVersionEntity != null) {
+ schemaVersion = schemaVersionEntity.getMetainfoValue();
+ serverVersion = ambariMetaInfo.getServerVersion();
+ }
+
+ if (schemaVersionEntity==null || !schemaVersion.equals(serverVersion)) {
String error = "Current database store version is not compatible with " +
- "current server version"
- + ", serverVersion=" + serverVersion
- + ", schemaVersion=" + schemaVersion;
+ "current server version"
+ + ", serverVersion=" + serverVersion
+ + ", schemaVersion=" + schemaVersion;
LOG.warn(error);
throw new AmbariException(error);
}
+
LOG.info("DB store version is compatible");
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
index 135650f..b3776e7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
@@ -18,18 +18,12 @@
package org.apache.ambari.server.controller;
+import java.security.SecureRandom;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
-import org.apache.ambari.server.actionmanager.ActionDBAccessor;
-import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl;
-import org.apache.ambari.server.actionmanager.CustomActionDBAccessor;
-import org.apache.ambari.server.actionmanager.CustomActionDBAccessorImpl;
-import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
-import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
-import org.apache.ambari.server.actionmanager.HostRoleCommandFactoryImpl;
-import org.apache.ambari.server.actionmanager.StageFactory;
+import org.apache.ambari.server.actionmanager.*;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.internal.ComponentResourceProvider;
import org.apache.ambari.server.controller.internal.HostComponentResourceProvider;
@@ -84,6 +78,7 @@ public class ControllerModule extends AbstractModule {
private final Configuration configuration;
private final HostsMap hostsMap;
+ private boolean dbInitNeeded;
public ControllerModule() throws Exception {
configuration = new Configuration();
@@ -106,12 +101,16 @@ public class ControllerModule extends AbstractModule {
install(buildJpaPersistModule());
bind(Gson.class).in(Scopes.SINGLETON);
+ bind(SecureRandom.class).in(Scopes.SINGLETON);
+
bind(Clusters.class).to(ClustersImpl.class);
bind(AmbariCustomCommandExecutionHelper.class).to(AmbariCustomCommandExecutionHelperImpl.class);
bind(ActionDBAccessor.class).to(ActionDBAccessorImpl.class);
bind(CustomActionDBAccessor.class).to(CustomActionDBAccessorImpl.class);
bindConstant().annotatedWith(Names.named("schedulerSleeptime")).to(10000L);
bindConstant().annotatedWith(Names.named("actionTimeout")).to(600000L);
+ bindConstant().annotatedWith(Names.named("dbInitNeeded")).to(dbInitNeeded);
+ bindConstant().annotatedWith(Names.named("statusCheckInterval")).to(5000L);
//ExecutionCommands cache size
@@ -167,9 +166,11 @@ public class ControllerModule extends AbstractModule {
switch (configuration.getJPATableGenerationStrategy()) {
case CREATE:
properties.setProperty("eclipselink.ddl-generation", "create-tables");
+ dbInitNeeded = true;
break;
case DROP_AND_CREATE:
properties.setProperty("eclipselink.ddl-generation", "drop-and-create-tables");
+ dbInitNeeded = true;
break;
default:
break;
@@ -213,6 +214,8 @@ public class ControllerModule extends AbstractModule {
install(new FactoryModuleBuilder().implement(RequestExecution.class,
RequestExecutionImpl.class).build(RequestExecutionFactory.class));
install(new FactoryModuleBuilder().build(StageFactory.class));
+ install(new FactoryModuleBuilder().build(RequestFactory.class));
+
bind(HostRoleCommandFactory.class).to(HostRoleCommandFactoryImpl.class);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
index d9f840c..fef258b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
@@ -56,6 +56,14 @@ class RequestResourceProvider extends AbstractControllerResourceProvider {
protected static final String REQUEST_ID_PROPERTY_ID = "Requests/id";
protected static final String REQUEST_STATUS_PROPERTY_ID = "Requests/request_status";
protected static final String REQUEST_CONTEXT_ID = "Requests/request_context";
+ protected static final String REQUEST_TYPE_ID = "Requests/type";
+ protected static final String REQUEST_INPUTS_ID = "Requests/inputs";
+ protected static final String REQUEST_TARGET_SERVICE_ID = "Requests/target_service";
+ protected static final String REQUEST_TARGET_COMPONENT_ID = "Requests/target_component";
+ protected static final String REQUEST_TARGET_HOSTS_ID = "Requests/target_hosts";
+ protected static final String REQUEST_CREATE_TIME_ID = "Requests/create_time";
+ protected static final String REQUEST_START_TIME_ID = "Requests/start_time";
+ protected static final String REQUEST_END_TIME_ID = "Requests/end_time";
protected static final String REQUEST_TASK_CNT_ID = "Requests/task_count";
protected static final String REQUEST_FAILED_TASK_CNT_ID = "Requests/failed_task_count";
protected static final String REQUEST_ABORTED_TASK_CNT_ID = "Requests/aborted_task_count";
@@ -254,33 +262,88 @@ class RequestResourceProvider extends AbstractControllerResourceProvider {
List<Long> requestIds,
Set<String> requestedPropertyIds) {
- List<HostRoleCommand> hostRoleCommands = actionManager.getAllTasksByRequestIds(requestIds);
- Map<Long, String> requestContexts = actionManager.getRequestContext(requestIds);
+ List<org.apache.ambari.server.actionmanager.Request> requests = actionManager.getRequests(requestIds);
+
Map<Long, Resource> resourceMap = new HashMap<Long, Resource>();
- // group by request id
- Map<Long, Set<HostRoleCommand>> commandMap = new HashMap<Long, Set<HostRoleCommand>>();
+ for (org.apache.ambari.server.actionmanager.Request request : requests) {
+ resourceMap.put(request.getRequestId(), getRequestResource(request, requestedPropertyIds));
+ }
+
+ return resourceMap.values();
+ }
+
+ private Resource getRequestResource(org.apache.ambari.server.actionmanager.Request request,
+ Set<String> requestedPropertyIds) {
+ Resource resource = new ResourceImpl(Resource.Type.Request);
+
+ setResourceProperty(resource, REQUEST_CLUSTER_NAME_PROPERTY_ID, request.getClusterName(), requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_ID_PROPERTY_ID, request.getRequestId(), requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_CONTEXT_ID, request.getRequestContext(), requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_TYPE_ID, request.getRequestType(), requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_INPUTS_ID, request.getInputs(), requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_TARGET_SERVICE_ID, request.getTargetService(), requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_TARGET_COMPONENT_ID, request.getTargetComponent(), requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_TARGET_HOSTS_ID, request.getTargetHosts(), requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_CREATE_TIME_ID, request.getCreateTime(), requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_START_TIME_ID, request.getStartTime(), requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_END_TIME_ID, request.getEndTime(), requestedPropertyIds);
+
+ List<HostRoleCommand> commands = request.getCommands();
+
+ int taskCount = commands.size();
+ int completedTaskCount = 0;
+ int queuedTaskCount = 0;
+ int pendingTaskCount = 0;
+ int failedTaskCount = 0;
+ int abortedTaskCount = 0;
+ int timedOutTaskCount = 0;
- for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
- Long requestId = hostRoleCommand.getRequestId();
- Set<HostRoleCommand> commands = commandMap.get(requestId);
+ for (HostRoleCommand hostRoleCommand : commands) {
+ HostRoleStatus status = hostRoleCommand.getStatus();
+ if (status.isCompletedState()) {
+ completedTaskCount++;
- if (commands == null) {
- commands = new HashSet<HostRoleCommand>();
- commandMap.put(requestId, commands);
+ switch (status) {
+ case ABORTED:
+ abortedTaskCount++;
+ break;
+ case FAILED:
+ failedTaskCount++;
+ break;
+ case TIMEDOUT:
+ timedOutTaskCount++;
+ break;
+ }
+ } else if (status.equals(HostRoleStatus.QUEUED)) {
+ queuedTaskCount++;
+ } else if (status.equals(HostRoleStatus.PENDING)) {
+ pendingTaskCount++;
}
- commands.add(hostRoleCommand);
}
- for (Map.Entry<Long, Set<HostRoleCommand>> entry : commandMap.entrySet()) {
- Long requestId = entry.getKey();
- Set<HostRoleCommand> commands = entry.getValue();
- String context = requestContexts.get(requestId);
+ int inProgressTaskCount = taskCount - completedTaskCount - queuedTaskCount - pendingTaskCount;
- resourceMap.put(requestId,
- getRequestResource(clusterName, requestId, context, commands, requestedPropertyIds));
- }
- return resourceMap.values();
+ // determine request status
+ HostRoleStatus requestStatus = failedTaskCount > 0 ? HostRoleStatus.FAILED :
+ abortedTaskCount > 0 ? HostRoleStatus.ABORTED :
+ timedOutTaskCount > 0 ? HostRoleStatus.TIMEDOUT :
+ inProgressTaskCount > 0 ? HostRoleStatus.IN_PROGRESS :
+ completedTaskCount == taskCount ? HostRoleStatus.COMPLETED :
+ HostRoleStatus.PENDING;
+ double progressPercent =
+ ((queuedTaskCount * 0.09 + inProgressTaskCount * 0.35 + completedTaskCount) / (double) taskCount) * 100.0;
+
+ setResourceProperty(resource, REQUEST_STATUS_PROPERTY_ID, requestStatus.toString(), requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_TASK_CNT_ID, taskCount, requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_FAILED_TASK_CNT_ID, failedTaskCount, requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_ABORTED_TASK_CNT_ID, abortedTaskCount, requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_TIMED_OUT_TASK_CNT_ID, timedOutTaskCount, requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_QUEUED_TASK_CNT_ID, queuedTaskCount, requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_COMPLETED_TASK_CNT_ID, completedTaskCount, requestedPropertyIds);
+ setResourceProperty(resource, REQUEST_PROGRESS_PERCENT_ID, progressPercent, requestedPropertyIds);
+
+ return resource;
}
// Get a request resource from the given set of host role commands.
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 5678887..e68b974 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -129,6 +129,17 @@ public class HostRoleCommandDAO {
}
@Transactional
+ public List<Long> findTaskIdsByStage(long requestId, long stageId) {
+ TypedQuery<Long> query = entityManagerProvider.get().createQuery("SELECT hostRoleCommand.taskId " +
+ "FROM HostRoleCommandEntity hostRoleCommand " +
+ "WHERE hostRoleCommand.stage.requestId=?1 " +
+ "AND hostRoleCommand.stage.stageId=?2 "+
+ "ORDER BY hostRoleCommand.taskId", Long.class);
+
+ return daoUtils.selectList(query, requestId, stageId);
+ }
+
+ @Transactional
public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, String role) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT command " +
"FROM HostRoleCommandEntity command " +
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
new file mode 100644
index 0000000..483550f
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.orm.dao;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.persist.Transactional;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+
+import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
+import java.util.Collection;
+import java.util.List;
+
+public class RequestDAO {
+ @Inject
+ Provider<EntityManager> entityManagerProvider;
+ @Inject
+ DaoUtils daoUtils;
+
+ @Transactional
+ public RequestEntity findByPK(Long requestId) {
+ return entityManagerProvider.get().find(RequestEntity.class, requestId);
+ }
+
+ @Transactional
+ public List<RequestEntity> findByPks(Collection<Long> requestIds) {
+ TypedQuery<RequestEntity> query = entityManagerProvider.get().createQuery("SELECT request FROM RequestEntity request " +
+ "WHERE request.requestId IN ?1", RequestEntity.class);
+ return daoUtils.selectList(query, requestIds);
+ }
+
+ @Transactional
+ public void create(RequestEntity requestEntity) {
+ entityManagerProvider.get().persist(requestEntity);
+ }
+
+ @Transactional
+ public RequestEntity merge(RequestEntity requestEntity) {
+ return entityManagerProvider.get().merge(requestEntity);
+ }
+
+ @Transactional
+ public void remove(RequestEntity requestEntity) {
+ entityManagerProvider.get().remove(merge(requestEntity));
+ }
+
+ @Transactional
+ public void removeByPK(Long requestId) {
+ remove(findByPK(requestId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ClusterEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ClusterEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ClusterEntity.java
index 55f7ee7..b9e18cf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ClusterEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ClusterEntity.java
@@ -76,6 +76,9 @@ public class ClusterEntity {
@OneToMany(mappedBy = "cluster", cascade = {CascadeType.REMOVE, CascadeType.REFRESH})
private Collection<StageEntity> stages;
+ @OneToMany(mappedBy = "cluster", cascade = {CascadeType.REMOVE, CascadeType.REFRESH})
+ private Collection<RequestEntity> requests;
+
@OneToMany(mappedBy = "clusterEntity", cascade = CascadeType.ALL)
private Collection<ClusterConfigEntity> configEntities;
@@ -219,4 +222,12 @@ public class ClusterEntity {
public void setRequestScheduleEntities(Collection<RequestScheduleEntity> requestScheduleEntities) {
this.requestScheduleEntities = requestScheduleEntities;
}
+
+ public Collection<RequestEntity> getRequests() {
+ return requests;
+ }
+
+ public void setRequests(Collection<RequestEntity> requests) {
+ this.requests = requests;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
new file mode 100644
index 0000000..c8d7fb7
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.orm.entities;
+
+import org.apache.ambari.server.actionmanager.RequestType;
+
+import javax.persistence.*;
+import java.util.Collection;
+
+@Table(name = "request")
+@Entity
+public class RequestEntity {
+
+ @Column(name = "request_id")
+ @Id
+ private Long requestId;
+
+ @Column(name = "cluster_id", updatable = false, insertable = false)
+ @Basic
+ private Long clusterId;
+
+ @Column(name = "request_context")
+ @Basic
+ private String requestContext;
+
+ @Column(name = "command_name")
+ @Basic
+ private String commandName;
+
+ @Column(name = "inputs", length = 32000)
+ @Basic
+ private String inputs;
+
+ @Column(name = "target_service")
+ @Basic
+ private String targetService;
+
+ @Column(name = "target_component")
+ @Basic
+ private String targetComponent;
+
+ @Column(name = "target_hosts")
+ @Lob
+ private String targetHosts;
+
+ @Column(name = "request_type")
+ @Enumerated(value = EnumType.STRING)
+ private RequestType requestType;
+
+ @Column(name = "status")
+ private String status;
+
+ @Basic
+ @Column(name = "create_time", nullable = false)
+ private Long createTime = System.currentTimeMillis();
+
+ @Basic
+ @Column(name = "start_time", nullable = false)
+ private Long startTime = -1L;
+
+ @Basic
+ @Column(name = "end_time", nullable = false)
+ private Long endTime = -1L;
+
+ @OneToMany(mappedBy = "request")
+ private Collection<StageEntity> stages;
+
+ @ManyToOne(cascade = {CascadeType.MERGE})
+ @JoinColumn(name = "cluster_id", referencedColumnName = "cluster_id")
+ private ClusterEntity cluster;
+
+ public Long getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(Long id) {
+ this.requestId = id;
+ }
+
+ public String getRequestContext() {
+ return requestContext;
+ }
+
+ public void setRequestContext(String request_context) {
+ this.requestContext = request_context;
+ }
+
+ public Collection<StageEntity> getStages() {
+ return stages;
+ }
+
+ public void setStages(Collection<StageEntity> stages) {
+ this.stages = stages;
+ }
+
+ public ClusterEntity getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(ClusterEntity cluster) {
+ this.cluster = cluster;
+ }
+
+ public Long getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Long createTime) {
+ this.createTime = createTime;
+ }
+
+ public Long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Long startTime) {
+ this.startTime = startTime;
+ }
+
+ public Long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(Long endTime) {
+ this.endTime = endTime;
+ }
+
+ public String getInputs() {
+ return inputs;
+ }
+
+ public void setInputs(String inputs) {
+ this.inputs = inputs;
+ }
+
+ public String getTargetService() {
+ return targetService;
+ }
+
+ public void setTargetService(String targetService) {
+ this.targetService = targetService;
+ }
+
+ public String getTargetComponent() {
+ return targetComponent;
+ }
+
+ public void setTargetComponent(String targetComponent) {
+ this.targetComponent = targetComponent;
+ }
+
+ public String getTargetHosts() {
+ return targetHosts;
+ }
+
+ public void setTargetHosts(String targetHosts) {
+ this.targetHosts = targetHosts;
+ }
+
+ public RequestType getRequestType() {
+ return requestType;
+ }
+
+ public void setRequestType(RequestType requestType) {
+ this.requestType = requestType;
+ }
+
+ public Long getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(Long clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public String getCommandName() {
+ return commandName;
+ }
+
+ public void setCommandName(String commandName) {
+ this.commandName = commandName;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RequestEntity that = (RequestEntity) o;
+
+ if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return requestId != null ? requestId.hashCode() : 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntity.java
index 79ee689..0775df6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntity.java
@@ -50,10 +50,10 @@ public class RequestScheduleBatchRequestEntity {
@Column(name = "request_id")
private Long requestId;
- @Column(name = "request_type")
+ @Column(name = "request_type", length = 255)
private String requestType;
- @Column(name = "request_uri")
+ @Column(name = "request_uri", length = 1024)
private String requestUri;
@Lob
@@ -61,13 +61,13 @@ public class RequestScheduleBatchRequestEntity {
@Column(name = "request_body")
private byte[] requestBody;
- @Column(name = "request_status")
+ @Column(name = "request_status", length = 255)
private String requestStatus;
@Column(name = "return_code")
private Integer returnCode;
- @Column(name = "return_message")
+ @Column(name = "return_message", length = 2000)
private String returnMessage;
@ManyToOne
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index c042dd0..2a353b4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -32,7 +32,7 @@ public class StageEntity {
@Basic
private Long clusterId;
- @Column(name = "request_id")
+ @Column(name = "request_id", insertable = false, updatable = false, nullable = false)
@Id
private Long requestId;
@@ -51,21 +51,16 @@ public class StageEntity {
@Column(name = "cluster_host_info")
@Basic
private byte[] clusterHostInfo;
-
-
- public String getClusterHostInfo() {
- return clusterHostInfo == null ? new String() : new String(clusterHostInfo);
- }
-
- public void setClusterHostInfo(String clusterHostInfo) {
- this.clusterHostInfo = clusterHostInfo.getBytes();
- }
+ @ManyToOne
+ @JoinColumn(name = "request_id", referencedColumnName = "request_id", nullable = false)
+ private RequestEntity request;
+
@ManyToOne(cascade = {CascadeType.MERGE})
@JoinColumn(name = "cluster_id", referencedColumnName = "cluster_id")
private ClusterEntity cluster;
- @OneToMany(mappedBy = "stage", cascade = CascadeType.REMOVE)
+ @OneToMany(mappedBy = "stage", cascade = CascadeType.REMOVE, fetch = FetchType.LAZY)
private Collection<HostRoleCommandEntity> hostRoleCommands;
@OneToMany(mappedBy = "stage", cascade = CascadeType.REMOVE)
@@ -107,6 +102,14 @@ public class StageEntity {
return defaultString(requestContext);
}
+ public String getClusterHostInfo() {
+ return clusterHostInfo == null ? new String() : new String(clusterHostInfo);
+ }
+
+ public void setClusterHostInfo(String clusterHostInfo) {
+ this.clusterHostInfo = clusterHostInfo.getBytes();
+ }
+
public void setRequestContext(String requestContext) {
if (requestContext != null) {
this.requestContext = requestContext;
@@ -163,4 +166,12 @@ public class StageEntity {
public void setRoleSuccessCriterias(Collection<RoleSuccessCriteriaEntity> roleSuccessCriterias) {
this.roleSuccessCriterias = roleSuccessCriterias;
}
+
+ public RequestEntity getRequest() {
+ return request;
+ }
+
+ public void setRequest(RequestEntity request) {
+ this.request = request;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8697655/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
index 72fb601..a68910a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
@@ -85,6 +85,8 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
throw new JobExecutionException(e);
}
+ LOG.debug("Finished linear job: " + jobKey);
+
JobDataMap jobDataMap = context.getMergedJobDataMap();
String nextJobName = jobDataMap.getString(NEXT_EXECUTION_JOB_NAME_KEY);
String nextJobGroup = jobDataMap.getString(NEXT_EXECUTION_JOB_GROUP_KEY);