You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by st...@apache.org on 2017/02/25 11:43:09 UTC
ambari git commit: AMBARI-19929.
TopologyRequest/TopologyLogicalRequest/TopologyHostRequest database
inconsistency. (stoader)
Repository: ambari
Updated Branches:
refs/heads/trunk e7961091f -> 11a7651c8
AMBARI-19929. TopologyRequest/TopologyLogicalRequest/TopologyHostRequest database inconsistency. (stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/11a7651c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/11a7651c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/11a7651c
Branch: refs/heads/trunk
Commit: 11a7651c8f06c05155ae0a5de249f0906bf370a4
Parents: e796109
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Sat Feb 25 12:42:31 2017 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Sat Feb 25 12:42:31 2017 +0100
----------------------------------------------------------------------
.../apache/ambari/server/state/ServiceImpl.java | 10 --
.../server/state/cluster/ClusterImpl.java | 3 +-
.../ambari/server/topology/TopologyManager.java | 131 ++++++++++++++-----
3 files changed, 98 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/11a7651c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
index 2cccb21..31c53d8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
@@ -418,7 +418,6 @@ public class ServiceImpl implements Service {
*/
private void persist(ClusterServiceEntity serviceEntity) {
persistEntities(serviceEntity);
- refresh();
// publish the service installed event
StackId stackId = cluster.getDesiredStackVersion();
@@ -441,15 +440,6 @@ public class ServiceImpl implements Service {
clusterServiceDAO.merge(serviceEntity);
}
- @Transactional
- public void refresh() {
- ClusterServiceEntityPK pk = new ClusterServiceEntityPK();
- pk.setClusterId(getClusterId());
- pk.setServiceName(getName());
- ClusterServiceEntity serviceEntity = getServiceEntity();
- clusterServiceDAO.refresh(serviceEntity);
- serviceDesiredStateDAO.refresh(serviceEntity.getServiceDesiredStateEntity());
- }
@Override
public boolean canBeRemoved() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/11a7651c/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index ab56844..739fe23 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -146,6 +146,7 @@ import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
import org.apache.ambari.server.state.stack.upgrade.Direction;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostSummary;
import org.apache.ambari.server.topology.TopologyRequest;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -447,7 +448,7 @@ public class ClusterImpl implements Cluster {
private void loadServices() {
ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity.getClusterServiceEntities().isEmpty()) {
+ if (CollectionUtils.isEmpty(clusterEntity.getClusterServiceEntities())) {
return;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/11a7651c/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index a26624e..f53f04a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -44,6 +44,7 @@ import org.apache.ambari.server.controller.AmbariServer;
import org.apache.ambari.server.controller.RequestStatusResponse;
import org.apache.ambari.server.controller.ShortTaskStatus;
import org.apache.ambari.server.controller.internal.ArtifactResourceProvider;
+import org.apache.ambari.server.controller.internal.BaseClusterRequest;
import org.apache.ambari.server.controller.internal.CalculatedStatus;
import org.apache.ambari.server.controller.internal.CredentialResourceProvider;
import org.apache.ambari.server.controller.internal.ProvisionClusterRequest;
@@ -77,6 +78,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Singleton;
+import com.google.inject.persist.Transactional;
/**
* Manages all cluster provisioning actions on the cluster topology.
@@ -246,20 +248,18 @@ public class TopologyManager {
public RequestStatusResponse provisionCluster(final ProvisionClusterRequest request) throws InvalidTopologyException, AmbariException {
ensureInitialized();
- if (null != request.getQuickLinksProfileJson()) {
- saveOrUpdateQuickLinksProfile(request.getQuickLinksProfileJson());
- }
-
- ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request);
+ final ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request);
final String clusterName = request.getClusterName();
+ final Stack stack = topology.getBlueprint().getStack();
final String repoVersion = request.getRepositoryVersion();
// get the id prior to creating ambari resources which increments the counter
- Long provisionId = ambariContext.getNextRequestId();
+ final Long provisionId = ambariContext.getNextRequestId();
- final Stack stack = topology.getBlueprint().getStack();
boolean configureSecurity = false;
+
SecurityConfiguration securityConfiguration = processSecurityConfiguration(request);
+
if (securityConfiguration != null && securityConfiguration.getType() == SecurityType.KERBEROS) {
configureSecurity = true;
addKerberosClient(topology);
@@ -291,18 +291,25 @@ public class TopologyManager {
topology.setConfigRecommendationStrategy(request.getConfigRecommendationStrategy());
// set provision action requested
topology.setProvisionAction(request.getProvisionAction());
- // persist request after it has successfully validated
- PersistedTopologyRequest persistedRequest = RetryHelper.executeWithRetry(new Callable<PersistedTopologyRequest>() {
- @Override
- public PersistedTopologyRequest call() throws Exception {
- return persistedState.persistTopologyRequest(request);
+
+ // persist request
+ LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() {
+ @Override
+ public LogicalRequest call() throws Exception {
+ LogicalRequest logicalRequest = processAndPersistProvisionClusterTopologyRequest(request, topology, provisionId);
+ return logicalRequest;
+ }
}
- });
+ );
+
clusterTopologyMap.put(clusterId, topology);
addClusterConfigRequest(topology, new ClusterConfigurationRequest(
ambariContext, topology, true, stackAdvisorBlueprintProcessor, configureSecurity));
+
+
+ // Notify listeners that cluster configuration finished
executor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
@@ -310,7 +317,9 @@ public class TopologyManager {
return Boolean.TRUE;
}
});
- LogicalRequest logicalRequest = processRequest(persistedRequest, topology, provisionId);
+
+ // Process the logical request
+ processRequest(request, topology, logicalRequest);
//todo: this should be invoked as part of a generic lifecycle event which could possibly
//todo: be tied to cluster state
@@ -320,6 +329,7 @@ public class TopologyManager {
return getRequestStatus(logicalRequest.getRequestId());
}
+
/**
* Saves the quick links profile to the DB as an Ambari setting. Creates a new setting entity or updates the existing
* one.
@@ -435,14 +445,14 @@ public class TopologyManager {
}
- public RequestStatusResponse scaleHosts(ScaleClusterRequest request)
+ public RequestStatusResponse scaleHosts(final ScaleClusterRequest request)
throws InvalidTopologyException, AmbariException {
ensureInitialized();
LOG.info("TopologyManager.scaleHosts: Entering");
String clusterName = request.getClusterName();
long clusterId = ambariContext.getClusterId(clusterName);
- ClusterTopology topology = clusterTopologyMap.get(clusterId);
+ final ClusterTopology topology = clusterTopologyMap.get(clusterId);
if (topology == null) {
throw new InvalidTopologyException("Unable to retrieve cluster topology for cluster. This is most likely a " +
"result of trying to scale a cluster via the API which was created using " +
@@ -453,11 +463,64 @@ public class TopologyManager {
hostNameCheck(request, topology);
request.setClusterId(clusterId);
- PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request);
+
// this registers/updates all request host groups
topology.update(request);
- return getRequestStatus(processRequest(persistedRequest, topology,
- ambariContext.getNextRequestId()).getRequestId());
+
+ final Long requestId = ambariContext.getNextRequestId();
+ LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() {
+ @Override
+ public LogicalRequest call() throws Exception {
+ LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, requestId);
+
+ return logicalRequest;
+ }
+ }
+ );
+
+ processRequest(request, topology, logicalRequest);
+
+ return getRequestStatus(logicalRequest.getRequestId());
+ }
+
+ /**
+ * Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided
+ * provision cluster request and topology.
+ * @param request Provision cluster request to create a logical request for.
+ * @param topology Cluster topology
+ * @param logicalRequestId The Id for the created logical request
+ * @return Logical request created.
+ */
+ @Transactional
+ protected LogicalRequest processAndPersistProvisionClusterTopologyRequest(ProvisionClusterRequest request, ClusterTopology topology, Long logicalRequestId)
+ throws InvalidTopologyException, AmbariException {
+
+ if (null != request.getQuickLinksProfileJson()) {
+ saveOrUpdateQuickLinksProfile(request.getQuickLinksProfileJson());
+ }
+
+ LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, logicalRequestId);
+
+ return logicalRequest;
+
+ }
+
+
+ /**
+ * Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided request and topology.
+ * @param request {@see ProvisionClusterRequest} or {@see ScaleClusterRequest} to create a logical request for.
+ * @param topology Cluster topology
+ * @param logicalRequestId The Id for the created logical request
+ * @return Logical request created.
+ */
+ @Transactional
+ protected LogicalRequest processAndPersistTopologyRequest(BaseClusterRequest request, ClusterTopology topology, Long logicalRequestId)
+ throws InvalidTopologyException, AmbariException {
+ PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request);
+
+ LogicalRequest logicalRequest = createLogicalRequest(persistedRequest, topology, logicalRequestId);
+
+ return logicalRequest;
}
private void hostNameCheck(ScaleClusterRequest request, ClusterTopology topology) throws InvalidTopologyException {
@@ -680,13 +743,12 @@ public class TopologyManager {
return hostComponentMap;
}
- private LogicalRequest processRequest(PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
+ private void processRequest(TopologyRequest request, ClusterTopology topology, final LogicalRequest logicalRequest)
throws AmbariException {
LOG.info("TopologyManager.processRequest: Entering");
- finalizeTopology(request.getRequest(), topology);
- LogicalRequest logicalRequest = createLogicalRequest(request, topology, requestId);
+ finalizeTopology(request, topology);
boolean requestHostComplete = false;
//todo: overall synchronization. Currently we have nested synchronization here
@@ -750,22 +812,16 @@ public class TopologyManager {
}
}
}
- return logicalRequest;
}
- private LogicalRequest createLogicalRequest(final PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
+ @Transactional
+ protected LogicalRequest createLogicalRequest(final PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
throws AmbariException {
final LogicalRequest logicalRequest = logicalRequestFactory.createRequest(
requestId, request.getRequest(), topology);
- RetryHelper.executeWithRetry(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- persistedState.persistLogicalRequest(logicalRequest, request.getId());
- return null;
- }
- });
+ persistedState.persistLogicalRequest(logicalRequest, request.getId());
allRequests.put(logicalRequest.getRequestId(), logicalRequest);
LOG.info("TopologyManager.createLogicalRequest: created LogicalRequest with ID = {} and completed persistence of this request.",
@@ -796,11 +852,10 @@ public class TopologyManager {
// persist the host request -> hostName association
try {
- RetryHelper.executeWithRetry(new Callable<Object>() {
+ RetryHelper.executeWithRetry(new Callable<Void>() {
@Override
- public Object call() throws Exception {
- persistedState.registerHostName(response.getHostRequestId(), hostName);
- persistedState.registerInTopologyHostInfo(host);
+ public Void call() throws Exception {
+ persistTopologyHostRegistration(response.getHostRequestId(), host);
return null;
}
});
@@ -822,6 +877,12 @@ public class TopologyManager {
}
}
+ @Transactional
+ protected void persistTopologyHostRegistration(long hostRequestId, final HostImpl host) {
+ persistedState.registerHostName(hostRequestId, host.getHostName());
+ persistedState.registerInTopologyHostInfo(host);
+ }
+
private void queueHostTasks(ClusterTopology topology, HostOfferResponse response, String hostName) {
LOG.info("TopologyManager.processAcceptedHostOffer: queueing tasks for host = {}", hostName);
response.executeTasks(taskExecutor, hostName, topology, ambariContext);