You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by st...@apache.org on 2017/02/25 11:43:09 UTC

ambari git commit: AMBARI-19929. TopologyRequest/TopologyLogicalRequest/TopologyHostRequest database inconsistency. (stoader)

Repository: ambari
Updated Branches:
  refs/heads/trunk e7961091f -> 11a7651c8


AMBARI-19929. TopologyRequest/TopologyLogicalRequest/TopologyHostRequest database inconsistency. (stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/11a7651c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/11a7651c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/11a7651c

Branch: refs/heads/trunk
Commit: 11a7651c8f06c05155ae0a5de249f0906bf370a4
Parents: e796109
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Sat Feb 25 12:42:31 2017 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Sat Feb 25 12:42:31 2017 +0100

----------------------------------------------------------------------
 .../apache/ambari/server/state/ServiceImpl.java |  10 --
 .../server/state/cluster/ClusterImpl.java       |   3 +-
 .../ambari/server/topology/TopologyManager.java | 131 ++++++++++++++-----
 3 files changed, 98 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/11a7651c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
index 2cccb21..31c53d8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
@@ -418,7 +418,6 @@ public class ServiceImpl implements Service {
    */
   private void persist(ClusterServiceEntity serviceEntity) {
     persistEntities(serviceEntity);
-    refresh();
 
     // publish the service installed event
     StackId stackId = cluster.getDesiredStackVersion();
@@ -441,15 +440,6 @@ public class ServiceImpl implements Service {
     clusterServiceDAO.merge(serviceEntity);
   }
 
-  @Transactional
-  public void refresh() {
-    ClusterServiceEntityPK pk = new ClusterServiceEntityPK();
-    pk.setClusterId(getClusterId());
-    pk.setServiceName(getName());
-    ClusterServiceEntity serviceEntity = getServiceEntity();
-    clusterServiceDAO.refresh(serviceEntity);
-    serviceDesiredStateDAO.refresh(serviceEntity.getServiceDesiredStateEntity());
-  }
 
   @Override
   public boolean canBeRemoved() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/11a7651c/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index ab56844..739fe23 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -146,6 +146,7 @@ import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
 import org.apache.ambari.server.state.stack.upgrade.Direction;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostSummary;
 import org.apache.ambari.server.topology.TopologyRequest;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -447,7 +448,7 @@ public class ClusterImpl implements Cluster {
 
   private void loadServices() {
     ClusterEntity clusterEntity = getClusterEntity();
-    if (clusterEntity.getClusterServiceEntities().isEmpty()) {
+    if (CollectionUtils.isEmpty(clusterEntity.getClusterServiceEntities())) {
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/11a7651c/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index a26624e..f53f04a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -44,6 +44,7 @@ import org.apache.ambari.server.controller.AmbariServer;
 import org.apache.ambari.server.controller.RequestStatusResponse;
 import org.apache.ambari.server.controller.ShortTaskStatus;
 import org.apache.ambari.server.controller.internal.ArtifactResourceProvider;
+import org.apache.ambari.server.controller.internal.BaseClusterRequest;
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
 import org.apache.ambari.server.controller.internal.CredentialResourceProvider;
 import org.apache.ambari.server.controller.internal.ProvisionClusterRequest;
@@ -77,6 +78,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Singleton;
+import com.google.inject.persist.Transactional;
 
 /**
  * Manages all cluster provisioning actions on the cluster topology.
@@ -246,20 +248,18 @@ public class TopologyManager {
   public RequestStatusResponse provisionCluster(final ProvisionClusterRequest request) throws InvalidTopologyException, AmbariException {
     ensureInitialized();
 
-    if (null != request.getQuickLinksProfileJson()) {
-      saveOrUpdateQuickLinksProfile(request.getQuickLinksProfileJson());
-    }
-
-    ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request);
+    final ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request);
     final String clusterName = request.getClusterName();
+    final Stack stack = topology.getBlueprint().getStack();
     final String repoVersion = request.getRepositoryVersion();
 
     // get the id prior to creating ambari resources which increments the counter
-    Long provisionId = ambariContext.getNextRequestId();
+    final Long provisionId = ambariContext.getNextRequestId();
 
-    final Stack stack = topology.getBlueprint().getStack();
     boolean configureSecurity = false;
+
     SecurityConfiguration securityConfiguration = processSecurityConfiguration(request);
+
     if (securityConfiguration != null && securityConfiguration.getType() == SecurityType.KERBEROS) {
       configureSecurity = true;
       addKerberosClient(topology);
@@ -291,18 +291,25 @@ public class TopologyManager {
     topology.setConfigRecommendationStrategy(request.getConfigRecommendationStrategy());
     // set provision action requested
     topology.setProvisionAction(request.getProvisionAction());
-    // persist request after it has successfully validated
-    PersistedTopologyRequest persistedRequest = RetryHelper.executeWithRetry(new Callable<PersistedTopologyRequest>() {
-      @Override
-      public PersistedTopologyRequest call() throws Exception {
-        return persistedState.persistTopologyRequest(request);
+
+    // persist request
+    LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() {
+        @Override
+        public LogicalRequest call() throws Exception {
+          LogicalRequest logicalRequest = processAndPersistProvisionClusterTopologyRequest(request, topology, provisionId);
+          return logicalRequest;
+        }
       }
-    });
+    );
+
 
     clusterTopologyMap.put(clusterId, topology);
 
     addClusterConfigRequest(topology, new ClusterConfigurationRequest(
       ambariContext, topology, true, stackAdvisorBlueprintProcessor, configureSecurity));
+
+
+    // Notify listeners that cluster configuration finished
     executor.submit(new Callable<Boolean>() {
       @Override
       public Boolean call() throws Exception {
@@ -310,7 +317,9 @@ public class TopologyManager {
         return Boolean.TRUE;
       }
     });
-    LogicalRequest logicalRequest = processRequest(persistedRequest, topology, provisionId);
+
+    // Process the logical request
+    processRequest(request, topology, logicalRequest);
 
     //todo: this should be invoked as part of a generic lifecycle event which could possibly
     //todo: be tied to cluster state
@@ -320,6 +329,7 @@ public class TopologyManager {
     return getRequestStatus(logicalRequest.getRequestId());
   }
 
+
   /**
    * Saves the quick links profile to the DB as an Ambari setting. Creates a new setting entity or updates the existing
    * one.
@@ -435,14 +445,14 @@ public class TopologyManager {
 
   }
 
-  public RequestStatusResponse scaleHosts(ScaleClusterRequest request)
+  public RequestStatusResponse scaleHosts(final ScaleClusterRequest request)
       throws InvalidTopologyException, AmbariException {
 
     ensureInitialized();
     LOG.info("TopologyManager.scaleHosts: Entering");
     String clusterName = request.getClusterName();
     long clusterId = ambariContext.getClusterId(clusterName);
-    ClusterTopology topology = clusterTopologyMap.get(clusterId);
+    final ClusterTopology topology = clusterTopologyMap.get(clusterId);
     if (topology == null) {
       throw new InvalidTopologyException("Unable to retrieve cluster topology for cluster. This is most likely a " +
           "result of trying to scale a cluster via the API which was created using " +
@@ -453,11 +463,64 @@ public class TopologyManager {
 
     hostNameCheck(request, topology);
     request.setClusterId(clusterId);
-    PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request);
+
     // this registers/updates all request host groups
     topology.update(request);
-    return getRequestStatus(processRequest(persistedRequest, topology,
-      ambariContext.getNextRequestId()).getRequestId());
+
+    final Long requestId = ambariContext.getNextRequestId();
+    LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() {
+        @Override
+        public LogicalRequest call() throws Exception {
+          LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, requestId);
+
+          return logicalRequest;
+        }
+      }
+    );
+
+    processRequest(request, topology, logicalRequest);
+
+    return getRequestStatus(logicalRequest.getRequestId());
+  }
+
+  /**
+   * Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided
+   * provision cluster request and topology.
+   * @param request Provision cluster request to create a logical request for.
+   * @param topology Cluster topology
+   * @param logicalRequestId The Id for the created logical request
+   * @return Logical request created.
+   */
+  @Transactional
+  protected LogicalRequest processAndPersistProvisionClusterTopologyRequest(ProvisionClusterRequest request, ClusterTopology topology, Long logicalRequestId)
+    throws InvalidTopologyException, AmbariException {
+
+    if (null != request.getQuickLinksProfileJson()) {
+      saveOrUpdateQuickLinksProfile(request.getQuickLinksProfileJson());
+    }
+
+    LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, logicalRequestId);
+
+    return logicalRequest;
+
+  }
+
+
+  /**
+   * Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided request and topology.
+   * @param request {@see ProvisionClusterRequest} or {@see ScaleClusterRequest} to create a logical request for.
+   * @param topology Cluster topology
+   * @param logicalRequestId The Id for the created logical request
+   * @return Logical request created.
+   */
+  @Transactional
+  protected  LogicalRequest processAndPersistTopologyRequest(BaseClusterRequest request, ClusterTopology topology, Long logicalRequestId)
+    throws InvalidTopologyException, AmbariException {
+    PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request);
+
+    LogicalRequest logicalRequest = createLogicalRequest(persistedRequest, topology, logicalRequestId);
+    
+    return logicalRequest;
   }
 
   private void hostNameCheck(ScaleClusterRequest request, ClusterTopology topology) throws InvalidTopologyException {
@@ -680,13 +743,12 @@ public class TopologyManager {
     return hostComponentMap;
   }
 
-  private LogicalRequest processRequest(PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
+  private void processRequest(TopologyRequest request, ClusterTopology topology, final LogicalRequest logicalRequest)
     throws AmbariException {
 
     LOG.info("TopologyManager.processRequest: Entering");
 
-    finalizeTopology(request.getRequest(), topology);
-    LogicalRequest logicalRequest = createLogicalRequest(request, topology, requestId);
+    finalizeTopology(request, topology);
 
     boolean requestHostComplete = false;
     //todo: overall synchronization. Currently we have nested synchronization here
@@ -750,22 +812,16 @@ public class TopologyManager {
         }
       }
     }
-    return logicalRequest;
   }
 
-  private LogicalRequest createLogicalRequest(final PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
+  @Transactional
+  protected LogicalRequest createLogicalRequest(final PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
       throws AmbariException {
 
     final LogicalRequest logicalRequest = logicalRequestFactory.createRequest(
         requestId, request.getRequest(), topology);
 
-    RetryHelper.executeWithRetry(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        persistedState.persistLogicalRequest(logicalRequest, request.getId());
-        return null;
-      }
-    });
+    persistedState.persistLogicalRequest(logicalRequest, request.getId());
 
     allRequests.put(logicalRequest.getRequestId(), logicalRequest);
     LOG.info("TopologyManager.createLogicalRequest: created LogicalRequest with ID = {} and completed persistence of this request.",
@@ -796,11 +852,10 @@ public class TopologyManager {
 
     // persist the host request -> hostName association
     try {
-      RetryHelper.executeWithRetry(new Callable<Object>() {
+      RetryHelper.executeWithRetry(new Callable<Void>() {
         @Override
-        public Object call() throws Exception {
-          persistedState.registerHostName(response.getHostRequestId(), hostName);
-          persistedState.registerInTopologyHostInfo(host);
+        public Void call() throws Exception {
+          persistTopologyHostRegistration(response.getHostRequestId(), host);
           return null;
         }
       });
@@ -822,6 +877,12 @@ public class TopologyManager {
     }
   }
 
+  @Transactional
+  protected void persistTopologyHostRegistration(long hostRequestId, final HostImpl host) {
+    persistedState.registerHostName(hostRequestId, host.getHostName());
+    persistedState.registerInTopologyHostInfo(host);
+  }
+
   private void queueHostTasks(ClusterTopology topology, HostOfferResponse response, String hostName) {
     LOG.info("TopologyManager.processAcceptedHostOffer: queueing tasks for host = {}", hostName);
     response.executeTasks(taskExecutor, hostName, topology, ambariContext);