You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2015/11/05 18:36:07 UTC

[1/2] ambari git commit: AMBARI-13744. Implement ability to retry operations after database exception for blueprint deploy. (mpapirkovskyy)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.1 8261a8d8c -> c13288fa9
  refs/heads/trunk 4db9a1eef -> b3a562c63


AMBARI-13744. Implement ability to retry operations after database exception for blueprint deploy. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c13288fa
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c13288fa
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c13288fa

Branch: refs/heads/branch-2.1
Commit: c13288fa9a290498eccd4b1eefc117e2b0e4d067
Parents: 8261a8d
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Thu Nov 5 11:06:40 2015 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Thu Nov 5 19:34:37 2015 +0200

----------------------------------------------------------------------
 .../server/configuration/Configuration.java     | 40 ++++---------
 .../ambari/server/controller/AmbariServer.java  |  2 +-
 .../internal/AbstractResourceProvider.java      |  2 +-
 .../ambari/server/topology/AmbariContext.java   | 53 +++++++++++++----
 .../ambari/server/topology/TopologyManager.java | 41 ++++++++++---
 .../apache/ambari/server/utils/RetryHelper.java | 51 ++++++++++++----
 .../internal/ClusterResourceProviderTest.java   | 61 ++++++++++++++++++++
 7 files changed, 191 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c13288fa/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index f50aeda..07e0962 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -202,10 +202,8 @@ public class Configuration {
   public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_ATTEMPTS = "server.jdbc.connection-pool.acquisition-retry-attempts";
   public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_DELAY = "server.jdbc.connection-pool.acquisition-retry-delay";
 
-  public static final String API_OPERATIONS_RETRY_ATTEMPTS_KEY = "api.operations.retry-attempts";
-  public static final String BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_KEY = "blueprints.operations.retry-attempts";
-  public static final String API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0";
-  public static final String BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0";
+  public static final String OPERATIONS_RETRY_ATTEMPTS_KEY = "server.operations.retry-attempts";
+  public static final String OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0";
   public static final int RETRY_ATTEMPTS_LIMIT = 10;
 
   public static final String SERVER_JDBC_RCA_USER_NAME_KEY = "server.jdbc.rca.user.name";
@@ -2350,38 +2348,22 @@ public class Configuration {
   }
 
   /**
-   * @return number of retry attempts for API update requests
+   * @return number of retry attempts for api and blueprint operations
    */
-  public int getApiOperationsRetryAttempts() {
-    String property = properties.getProperty(API_OPERATIONS_RETRY_ATTEMPTS_KEY, API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+  public int getOperationsRetryAttempts() {
+    String property = properties.getProperty(OPERATIONS_RETRY_ATTEMPTS_KEY, OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
     Integer attempts = Integer.valueOf(property);
     if (attempts < 0) {
-      LOG.warn("Invalid API retry attempts number ({}), should be [0,{}]. Value reset to default {}",
-          attempts, RETRY_ATTEMPTS_LIMIT, API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
-      attempts = Integer.valueOf(API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+      LOG.warn("Invalid operations retry attempts number ({}), should be [0,{}]. Value reset to default {}",
+          attempts, RETRY_ATTEMPTS_LIMIT, OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+      attempts = Integer.valueOf(OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
     } else if (attempts > RETRY_ATTEMPTS_LIMIT) {
-      LOG.warn("Invalid API retry attempts number ({}), should be [0,{}]. Value set to {}",
+      LOG.warn("Invalid operations retry attempts number ({}), should be [0,{}]. Value set to {}",
           attempts, RETRY_ATTEMPTS_LIMIT, RETRY_ATTEMPTS_LIMIT);
       attempts = RETRY_ATTEMPTS_LIMIT;
     }
-    return attempts;
-  }
-
-  /**
-   * @return number of retry attempts for blueprints operations
-   */
-  public int getBlueprintsOperationsRetryAttempts() {
-    String property = properties.getProperty(BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_KEY,
-            BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
-    Integer attempts = Integer.valueOf(property);
-    if (attempts < 0) {
-      LOG.warn("Invalid blueprint operations retry attempts number ({}), should be [0,{}]. Value reset to default {}",
-          attempts, RETRY_ATTEMPTS_LIMIT, BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
-      attempts = Integer.valueOf(BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
-    } else if (attempts > RETRY_ATTEMPTS_LIMIT) {
-      LOG.warn("Invalid blueprint operations retry attempts number ({}), should be [0,{}]. Value set to {}",
-          attempts, RETRY_ATTEMPTS_LIMIT, RETRY_ATTEMPTS_LIMIT);
-      attempts = RETRY_ATTEMPTS_LIMIT;
+    if (attempts > 0) {
+      LOG.info("Operations retry enabled. Number of retry attempts: {}", attempts);
     }
     return attempts;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c13288fa/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index a00e526..bdd82dd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -711,7 +711,7 @@ public class AmbariServer {
     TopologyManager.init(injector.getInstance(StackAdvisorBlueprintProcessor.class));
     StackAdvisorBlueprintProcessor.init(injector.getInstance(StackAdvisorHelper.class));
 
-    RetryHelper.init(configs.getApiOperationsRetryAttempts(), configs.getBlueprintsOperationsRetryAttempts());
+    RetryHelper.init(configs.getOperationsRetryAttempts());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/c13288fa/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
index 2bf8fe3..3464c19 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
@@ -443,7 +443,7 @@ public abstract class AbstractResourceProvider extends BaseProvider implements R
   //invoke command with retry support in case of database fail
   private <T> T invokeWithRetry(Command<T> command) throws AmbariException {
     RetryHelper.clearAffectedClusters();
-    int retryAttempts = RetryHelper.getApiOperationsRetryAttempts();
+    int retryAttempts = RetryHelper.getOperationsRetryAttempts();
     do {
 
       try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/c13288fa/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
index 5e93aeb..4b0a1d1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
@@ -56,6 +56,7 @@ import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
+import org.apache.ambari.server.utils.RetryHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +67,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -144,9 +146,16 @@ public class AmbariContext {
 
   public void createAmbariClusterResource(String clusterName, String stackName, String stackVersion, SecurityType securityType) {
     String stackInfo = String.format("%s-%s", stackName, stackVersion);
-    ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, null, securityType, stackInfo, null);
+    final ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, null, securityType, stackInfo, null);
     try {
-      getController().createCluster(clusterRequest);
+      RetryHelper.executeWithRetry(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          getController().createCluster(clusterRequest);
+          return null;
+        }
+      });
+
     } catch (AmbariException e) {
       e.printStackTrace();
       throw new RuntimeException("Failed to create Cluster resource: " + e, e);
@@ -229,7 +238,7 @@ public class AmbariContext {
           hostName, e.toString()), e);
     }
 
-    Set<ServiceComponentHostRequest> requests = new HashSet<ServiceComponentHostRequest>();
+    final Set<ServiceComponentHostRequest> requests = new HashSet<ServiceComponentHostRequest>();
     for (Map.Entry<String, Collection<String>> entry : components.entrySet()) {
       String service = entry.getKey();
       for (String component : entry.getValue()) {
@@ -240,7 +249,13 @@ public class AmbariContext {
       }
     }
     try {
-      getController().createHostComponents(requests);
+      RetryHelper.executeWithRetry(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          getController().createHostComponents(requests);
+          return null;
+        }
+      });
     } catch (AmbariException e) {
       e.printStackTrace();
       throw new RuntimeException(String.format("Unable to create host component resource for host '%s': %s",
@@ -286,9 +301,15 @@ public class AmbariContext {
     hostRoleCommandFactory = factory;
   }
 
-  public void registerHostWithConfigGroup(String hostName, ClusterTopology topology, String groupName) {
+  public void registerHostWithConfigGroup(final String hostName, final ClusterTopology topology, final String groupName) {
     try {
-      if (!addHostToExistingConfigGroups(hostName, topology, groupName)) {
+      boolean hostAdded = RetryHelper.executeWithRetry(new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          return addHostToExistingConfigGroups(hostName, topology, groupName);
+        }
+      });
+      if (!hostAdded) {
         createConfigGroupsAndRegisterHost(topology, groupName);
       }
     } catch (Exception e) {
@@ -325,10 +346,16 @@ public class AmbariContext {
    */
   public void persistInstallStateForUI(String clusterName, String stackName, String stackVersion) {
     String stackInfo = String.format("%s-%s", stackName, stackVersion);
-    ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, "INSTALLED", null, stackInfo, null);
+    final ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, "INSTALLED", null, stackInfo, null);
 
     try {
-      getController().updateClusters(Collections.singleton(clusterRequest), null);
+      RetryHelper.executeWithRetry(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          getController().updateClusters(Collections.singleton(clusterRequest), null);
+          return null;
+        }
+      });
     } catch (AmbariException e) {
       LOG.error("Unable to set install state for UI", e);
     }
@@ -339,9 +366,15 @@ public class AmbariContext {
     return AbstractResourceProvider.getConfigurationRequests("Clusters", clusterProperties);
   }
 
-  public void setConfigurationOnCluster(ClusterRequest clusterRequest) {
+  public void setConfigurationOnCluster(final ClusterRequest clusterRequest) {
     try {
-      getController().updateClusters(Collections.singleton(clusterRequest), null);
+      RetryHelper.executeWithRetry(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          getController().updateClusters(Collections.singleton(clusterRequest), null);
+          return null;
+        }
+      });
     } catch (AmbariException e) {
       e.printStackTrace();
       throw new RuntimeException("Failed to set configurations on cluster: " + e, e);

http://git-wip-us.apache.org/repos/asf/ambari/blob/c13288fa/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 480c2ba..7f571ce 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -42,6 +42,7 @@ import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.security.encryption.CredentialStoreService;
 import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.host.HostImpl;
+import org.apache.ambari.server.utils.RetryHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +56,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -121,7 +123,7 @@ public class TopologyManager {
     }
   }
 
-  public RequestStatusResponse provisionCluster(ProvisionClusterRequest request) throws InvalidTopologyException, AmbariException {
+  public RequestStatusResponse provisionCluster(final ProvisionClusterRequest request) throws InvalidTopologyException, AmbariException {
     ensureInitialized();
     ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request);
     final String clusterName = request.getClusterName();
@@ -153,7 +155,12 @@ public class TopologyManager {
     // set recommendation strategy
     topology.setConfigRecommendationStrategy(request.getConfigRecommendationStrategy());
     // persist request after it has successfully validated
-    PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request);
+    PersistedTopologyRequest persistedRequest = RetryHelper.executeWithRetry(new Callable<PersistedTopologyRequest>() {
+      @Override
+      public PersistedTopologyRequest call() throws Exception {
+        return persistedState.persistTopologyRequest(request);
+      }
+    });
 
     clusterTopologyMap.put(clusterId, topology);
 
@@ -522,13 +529,19 @@ public class TopologyManager {
     return logicalRequest;
   }
 
-  private LogicalRequest createLogicalRequest(PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
+  private LogicalRequest createLogicalRequest(final PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
       throws AmbariException {
 
-    LogicalRequest logicalRequest = logicalRequestFactory.createRequest(
+    final LogicalRequest logicalRequest = logicalRequestFactory.createRequest(
         requestId, request.getRequest(), topology);
 
-    persistedState.persistLogicalRequest(logicalRequest, request.getId());
+    RetryHelper.executeWithRetry(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        persistedState.persistLogicalRequest(logicalRequest, request.getId());
+        return null;
+      }
+    });
 
     allRequests.put(logicalRequest.getRequestId(), logicalRequest);
     LOG.info("TopologyManager.createLogicalRequest: created LogicalRequest with ID = {} and completed persistence of this request.",
@@ -541,8 +554,8 @@ public class TopologyManager {
     return logicalRequest;
   }
 
-  private void processAcceptedHostOffer(ClusterTopology topology, HostOfferResponse response, HostImpl host) {
-    String hostName = host.getHostName();
+  private void processAcceptedHostOffer(ClusterTopology topology, final HostOfferResponse response, HostImpl host) {
+    final String hostName = host.getHostName();
     try {
       topology.addHostToTopology(response.getHostGroupName(), hostName);
     } catch (InvalidTopologyException e) {
@@ -554,7 +567,19 @@ public class TopologyManager {
     }
 
     // persist the host request -> hostName association
-    persistedState.registerHostName(response.getHostRequestId(), hostName);
+    try {
+      RetryHelper.executeWithRetry(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          persistedState.registerHostName(response.getHostRequestId(), hostName);
+          return null;
+        }
+      });
+    } catch (AmbariException e) {
+      LOG.error("Exception ocurred while registering host name", e);
+      throw new RuntimeException(e);
+    }
+
 
     LOG.info("TopologyManager.processAcceptedHostOffer: about to execute tasks for host = {}",
         hostName);

http://git-wip-us.apache.org/repos/asf/ambari/blob/c13288fa/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
index 9d2fe9e..877e84d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
@@ -17,19 +17,24 @@
  */
 package org.apache.ambari.server.utils;
 
-import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.state.Cluster;
 import org.eclipse.persistence.exceptions.DatabaseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 /**
  * Provides utility methods to support operations retry
  * TODO injection as Guice singleon, static for now to avoid major modifications
  */
 public class RetryHelper {
+  protected final static Logger LOG = LoggerFactory.getLogger(RetryHelper.class);
+
   private static ThreadLocal<Set<Cluster>> affectedClusters = new ThreadLocal<Set<Cluster>>(){
     @Override
     protected Set<Cluster> initialValue() {
@@ -37,16 +42,14 @@ public class RetryHelper {
     }
   };
 
-  private static int apiRetryAttempts = 0;
-  private static int blueprintsRetryAttempts = 0;
+  private static int operationsRetryAttempts = 0;
 
-  public static void init(int apiOperationsRetryAttempts, int blueprintOperationsRetryAttempts) {
-    apiRetryAttempts = apiOperationsRetryAttempts;
-    blueprintsRetryAttempts = blueprintOperationsRetryAttempts;
+  public static void init(int operationsRetryAttempts) {
+    RetryHelper.operationsRetryAttempts = operationsRetryAttempts;
   }
 
   public static void addAffectedCluster(Cluster cluster) {
-    if (apiRetryAttempts > 0 || blueprintsRetryAttempts > 0) {
+    if (operationsRetryAttempts > 0 ) {
       affectedClusters.get().add(cluster);
     }
   }
@@ -56,13 +59,13 @@ public class RetryHelper {
   }
 
   public static void clearAffectedClusters() {
-    if (apiRetryAttempts > 0 || blueprintsRetryAttempts > 0) {
+    if (operationsRetryAttempts > 0) {
       affectedClusters.get().clear();
     }
   }
 
-  public static int getApiOperationsRetryAttempts() {
-    return apiRetryAttempts;
+  public static int getOperationsRetryAttempts() {
+    return operationsRetryAttempts;
   }
 
   public static boolean isDatabaseException(Throwable ex) {
@@ -82,4 +85,32 @@ public class RetryHelper {
       cluster.invalidateData();
     }
   }
+
+  public static <T> T executeWithRetry(Callable<T> command) throws AmbariException {
+    RetryHelper.clearAffectedClusters();
+    int retryAttempts = RetryHelper.getOperationsRetryAttempts();
+    do {
+
+      try {
+        return command.call();
+      } catch (Exception e) {
+        if (RetryHelper.isDatabaseException(e)) {
+
+          RetryHelper.invalidateAffectedClusters();
+
+          if (retryAttempts > 0) {
+            LOG.error("Ignoring database exception to perform operation retry, attempts remaining: " + retryAttempts, e);
+            retryAttempts--;
+          } else {
+            RetryHelper.clearAffectedClusters();
+            throw new AmbariException(e.getMessage(), e);
+          }
+        } else {
+          RetryHelper.clearAffectedClusters();
+          throw new AmbariException(e.getMessage(), e);
+        }
+      }
+
+    } while (true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c13288fa/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java
index f885a5b..84de604 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java
@@ -40,8 +40,10 @@ import org.apache.ambari.server.topology.SecurityConfiguration;
 import org.apache.ambari.server.topology.SecurityConfigurationFactory;
 import org.apache.ambari.server.topology.TopologyManager;
 import org.apache.ambari.server.topology.TopologyRequestFactory;
+import org.apache.ambari.server.utils.RetryHelper;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.eclipse.persistence.exceptions.DatabaseException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -65,6 +67,7 @@ import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.createStrictMock;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
@@ -280,6 +283,64 @@ public class ClusterResourceProviderTest {
   }
 
   @Test
+  public void testCreateResourcesWithRetry() throws Exception {
+    RetryHelper.init(3);
+    Resource.Type type = Resource.Type.Cluster;
+
+    AmbariManagementController managementController = createMock(AmbariManagementController.class);
+    RequestStatusResponse response = createNiceMock(RequestStatusResponse.class);
+
+    managementController.createCluster(
+        AbstractResourceProviderTest.Matcher.getClusterRequest(null, "Cluster100", "HDP-0.1", null));
+    expectLastCall().andThrow(new DatabaseException("test"){}).once().andVoid().atLeastOnce();
+
+    // replay
+    replay(managementController, response);
+
+    ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
+        type,
+        PropertyHelper.getPropertyIds(type),
+        PropertyHelper.getKeyPropertyIds(type),
+        managementController);
+
+    AbstractResourceProviderTest.TestObserver observer = new AbstractResourceProviderTest.TestObserver();
+
+    ((ObservableResourceProvider)provider).addObserver(observer);
+
+    // add the property map to a set for the request.  add more maps for multiple creates
+    Set<Map<String, Object>> propertySet = new LinkedHashSet<Map<String, Object>>();
+
+    // Cluster 1: create a map of properties for the request
+    Map<String, Object> properties = new LinkedHashMap<String, Object>();
+
+    // add the cluster name to the properties map
+    properties.put(ClusterResourceProvider.CLUSTER_NAME_PROPERTY_ID, "Cluster100");
+
+    // add the version to the properties map
+    properties.put(ClusterResourceProvider.CLUSTER_VERSION_PROPERTY_ID, "HDP-0.1");
+
+    propertySet.add(properties);
+
+    // create the request
+    Request request = PropertyHelper.getCreateRequest(propertySet, null);
+
+    provider.createResources(request);
+
+    ResourceProviderEvent lastEvent = observer.getLastEvent();
+    Assert.assertNotNull(lastEvent);
+    Assert.assertEquals(Resource.Type.Cluster, lastEvent.getResourceType());
+    Assert.assertEquals(ResourceProviderEvent.Type.Create, lastEvent.getType());
+    Assert.assertEquals(request, lastEvent.getRequest());
+    Assert.assertNull(lastEvent.getPredicate());
+
+    // verify
+    verify(managementController, response);
+
+    RetryHelper.init(0);
+
+  }
+
+  @Test
   public void testGetResources() throws Exception{
     Resource.Type type = Resource.Type.Cluster;
 


[2/2] ambari git commit: AMBARI-13744. Implement ability to retry operations after database exception for blueprint deploy. (mpapirkovskyy)

Posted by mp...@apache.org.
AMBARI-13744. Implement ability to retry operations after database exception for blueprint deploy. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b3a562c6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b3a562c6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b3a562c6

Branch: refs/heads/trunk
Commit: b3a562c6356a63d597bc1105886e7dcfc2d03607
Parents: 4db9a1e
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Thu Nov 5 11:06:40 2015 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Thu Nov 5 19:34:57 2015 +0200

----------------------------------------------------------------------
 .../server/configuration/Configuration.java     | 40 ++++---------
 .../ambari/server/controller/AmbariServer.java  |  2 +-
 .../internal/AbstractResourceProvider.java      |  2 +-
 .../ambari/server/topology/AmbariContext.java   | 53 +++++++++++++----
 .../ambari/server/topology/TopologyManager.java | 41 ++++++++++---
 .../apache/ambari/server/utils/RetryHelper.java | 51 ++++++++++++----
 .../internal/ClusterResourceProviderTest.java   | 61 ++++++++++++++++++++
 7 files changed, 191 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b3a562c6/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index b4d5de8..e5e2c90 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -202,10 +202,8 @@ public class Configuration {
   public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_ATTEMPTS = "server.jdbc.connection-pool.acquisition-retry-attempts";
   public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_DELAY = "server.jdbc.connection-pool.acquisition-retry-delay";
 
-  public static final String API_OPERATIONS_RETRY_ATTEMPTS_KEY = "api.operations.retry-attempts";
-  public static final String BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_KEY = "blueprints.operations.retry-attempts";
-  public static final String API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0";
-  public static final String BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0";
+  public static final String OPERATIONS_RETRY_ATTEMPTS_KEY = "server.operations.retry-attempts";
+  public static final String OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0";
   public static final int RETRY_ATTEMPTS_LIMIT = 10;
 
   public static final String SERVER_JDBC_RCA_USER_NAME_KEY = "server.jdbc.rca.user.name";
@@ -2359,38 +2357,22 @@ public class Configuration {
   }
 
   /**
-   * @return number of retry attempts for API update requests
+   * @return number of retry attempts for api and blueprint operations
    */
-  public int getApiOperationsRetryAttempts() {
-    String property = properties.getProperty(API_OPERATIONS_RETRY_ATTEMPTS_KEY, API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+  public int getOperationsRetryAttempts() {
+    String property = properties.getProperty(OPERATIONS_RETRY_ATTEMPTS_KEY, OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
     Integer attempts = Integer.valueOf(property);
     if (attempts < 0) {
-      LOG.warn("Invalid API retry attempts number ({}), should be [0,{}]. Value reset to default {}",
-          attempts, RETRY_ATTEMPTS_LIMIT, API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
-      attempts = Integer.valueOf(API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+      LOG.warn("Invalid operations retry attempts number ({}), should be [0,{}]. Value reset to default {}",
+          attempts, RETRY_ATTEMPTS_LIMIT, OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+      attempts = Integer.valueOf(OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
     } else if (attempts > RETRY_ATTEMPTS_LIMIT) {
-      LOG.warn("Invalid API retry attempts number ({}), should be [0,{}]. Value set to {}",
+      LOG.warn("Invalid operations retry attempts number ({}), should be [0,{}]. Value set to {}",
           attempts, RETRY_ATTEMPTS_LIMIT, RETRY_ATTEMPTS_LIMIT);
       attempts = RETRY_ATTEMPTS_LIMIT;
     }
-    return attempts;
-  }
-
-  /**
-   * @return number of retry attempts for blueprints operations
-   */
-  public int getBlueprintsOperationsRetryAttempts() {
-    String property = properties.getProperty(BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_KEY,
-            BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
-    Integer attempts = Integer.valueOf(property);
-    if (attempts < 0) {
-      LOG.warn("Invalid blueprint operations retry attempts number ({}), should be [0,{}]. Value reset to default {}",
-          attempts, RETRY_ATTEMPTS_LIMIT, BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
-      attempts = Integer.valueOf(BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
-    } else if (attempts > RETRY_ATTEMPTS_LIMIT) {
-      LOG.warn("Invalid blueprint operations retry attempts number ({}), should be [0,{}]. Value set to {}",
-          attempts, RETRY_ATTEMPTS_LIMIT, RETRY_ATTEMPTS_LIMIT);
-      attempts = RETRY_ATTEMPTS_LIMIT;
+    if (attempts > 0) {
+      LOG.info("Operations retry enabled. Number of retry attempts: {}", attempts);
     }
     return attempts;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/b3a562c6/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index ea178b5..15cfb90 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -710,7 +710,7 @@ public class AmbariServer {
     TopologyManager.init(injector.getInstance(StackAdvisorBlueprintProcessor.class));
     StackAdvisorBlueprintProcessor.init(injector.getInstance(StackAdvisorHelper.class));
 
-    RetryHelper.init(configs.getApiOperationsRetryAttempts(), configs.getBlueprintsOperationsRetryAttempts());
+    RetryHelper.init(configs.getOperationsRetryAttempts());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/b3a562c6/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
index 2bf8fe3..3464c19 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
@@ -443,7 +443,7 @@ public abstract class AbstractResourceProvider extends BaseProvider implements R
   //invoke command with retry support in case of database fail
   private <T> T invokeWithRetry(Command<T> command) throws AmbariException {
     RetryHelper.clearAffectedClusters();
-    int retryAttempts = RetryHelper.getApiOperationsRetryAttempts();
+    int retryAttempts = RetryHelper.getOperationsRetryAttempts();
     do {
 
       try {

http://git-wip-us.apache.org/repos/asf/ambari/blob/b3a562c6/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
index 5e93aeb..4b0a1d1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
@@ -56,6 +56,7 @@ import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
+import org.apache.ambari.server.utils.RetryHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +67,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -144,9 +146,16 @@ public class AmbariContext {
 
   public void createAmbariClusterResource(String clusterName, String stackName, String stackVersion, SecurityType securityType) {
     String stackInfo = String.format("%s-%s", stackName, stackVersion);
-    ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, null, securityType, stackInfo, null);
+    final ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, null, securityType, stackInfo, null);
     try {
-      getController().createCluster(clusterRequest);
+      RetryHelper.executeWithRetry(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          getController().createCluster(clusterRequest);
+          return null;
+        }
+      });
+
     } catch (AmbariException e) {
       e.printStackTrace();
       throw new RuntimeException("Failed to create Cluster resource: " + e, e);
@@ -229,7 +238,7 @@ public class AmbariContext {
           hostName, e.toString()), e);
     }
 
-    Set<ServiceComponentHostRequest> requests = new HashSet<ServiceComponentHostRequest>();
+    final Set<ServiceComponentHostRequest> requests = new HashSet<ServiceComponentHostRequest>();
     for (Map.Entry<String, Collection<String>> entry : components.entrySet()) {
       String service = entry.getKey();
       for (String component : entry.getValue()) {
@@ -240,7 +249,13 @@ public class AmbariContext {
       }
     }
     try {
-      getController().createHostComponents(requests);
+      RetryHelper.executeWithRetry(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          getController().createHostComponents(requests);
+          return null;
+        }
+      });
     } catch (AmbariException e) {
       e.printStackTrace();
       throw new RuntimeException(String.format("Unable to create host component resource for host '%s': %s",
@@ -286,9 +301,15 @@ public class AmbariContext {
     hostRoleCommandFactory = factory;
   }
 
-  public void registerHostWithConfigGroup(String hostName, ClusterTopology topology, String groupName) {
+  public void registerHostWithConfigGroup(final String hostName, final ClusterTopology topology, final String groupName) {
     try {
-      if (!addHostToExistingConfigGroups(hostName, topology, groupName)) {
+      boolean hostAdded = RetryHelper.executeWithRetry(new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          return addHostToExistingConfigGroups(hostName, topology, groupName);
+        }
+      });
+      if (!hostAdded) {
         createConfigGroupsAndRegisterHost(topology, groupName);
       }
     } catch (Exception e) {
@@ -325,10 +346,16 @@ public class AmbariContext {
    */
   public void persistInstallStateForUI(String clusterName, String stackName, String stackVersion) {
     String stackInfo = String.format("%s-%s", stackName, stackVersion);
-    ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, "INSTALLED", null, stackInfo, null);
+    final ClusterRequest clusterRequest = new ClusterRequest(null, clusterName, "INSTALLED", null, stackInfo, null);
 
     try {
-      getController().updateClusters(Collections.singleton(clusterRequest), null);
+      RetryHelper.executeWithRetry(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          getController().updateClusters(Collections.singleton(clusterRequest), null);
+          return null;
+        }
+      });
     } catch (AmbariException e) {
       LOG.error("Unable to set install state for UI", e);
     }
@@ -339,9 +366,15 @@ public class AmbariContext {
     return AbstractResourceProvider.getConfigurationRequests("Clusters", clusterProperties);
   }
 
-  public void setConfigurationOnCluster(ClusterRequest clusterRequest) {
+  public void setConfigurationOnCluster(final ClusterRequest clusterRequest) {
     try {
-      getController().updateClusters(Collections.singleton(clusterRequest), null);
+      RetryHelper.executeWithRetry(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          getController().updateClusters(Collections.singleton(clusterRequest), null);
+          return null;
+        }
+      });
     } catch (AmbariException e) {
       e.printStackTrace();
       throw new RuntimeException("Failed to set configurations on cluster: " + e, e);

http://git-wip-us.apache.org/repos/asf/ambari/blob/b3a562c6/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 480c2ba..7f571ce 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -42,6 +42,7 @@ import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.security.encryption.CredentialStoreService;
 import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.host.HostImpl;
+import org.apache.ambari.server.utils.RetryHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +56,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -121,7 +123,7 @@ public class TopologyManager {
     }
   }
 
-  public RequestStatusResponse provisionCluster(ProvisionClusterRequest request) throws InvalidTopologyException, AmbariException {
+  public RequestStatusResponse provisionCluster(final ProvisionClusterRequest request) throws InvalidTopologyException, AmbariException {
     ensureInitialized();
     ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request);
     final String clusterName = request.getClusterName();
@@ -153,7 +155,12 @@ public class TopologyManager {
     // set recommendation strategy
     topology.setConfigRecommendationStrategy(request.getConfigRecommendationStrategy());
     // persist request after it has successfully validated
-    PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request);
+    PersistedTopologyRequest persistedRequest = RetryHelper.executeWithRetry(new Callable<PersistedTopologyRequest>() {
+      @Override
+      public PersistedTopologyRequest call() throws Exception {
+        return persistedState.persistTopologyRequest(request);
+      }
+    });
 
     clusterTopologyMap.put(clusterId, topology);
 
@@ -522,13 +529,19 @@ public class TopologyManager {
     return logicalRequest;
   }
 
-  private LogicalRequest createLogicalRequest(PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
+  private LogicalRequest createLogicalRequest(final PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
       throws AmbariException {
 
-    LogicalRequest logicalRequest = logicalRequestFactory.createRequest(
+    final LogicalRequest logicalRequest = logicalRequestFactory.createRequest(
         requestId, request.getRequest(), topology);
 
-    persistedState.persistLogicalRequest(logicalRequest, request.getId());
+    RetryHelper.executeWithRetry(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        persistedState.persistLogicalRequest(logicalRequest, request.getId());
+        return null;
+      }
+    });
 
     allRequests.put(logicalRequest.getRequestId(), logicalRequest);
     LOG.info("TopologyManager.createLogicalRequest: created LogicalRequest with ID = {} and completed persistence of this request.",
@@ -541,8 +554,8 @@ public class TopologyManager {
     return logicalRequest;
   }
 
-  private void processAcceptedHostOffer(ClusterTopology topology, HostOfferResponse response, HostImpl host) {
-    String hostName = host.getHostName();
+  private void processAcceptedHostOffer(ClusterTopology topology, final HostOfferResponse response, HostImpl host) {
+    final String hostName = host.getHostName();
     try {
       topology.addHostToTopology(response.getHostGroupName(), hostName);
     } catch (InvalidTopologyException e) {
@@ -554,7 +567,19 @@ public class TopologyManager {
     }
 
     // persist the host request -> hostName association
-    persistedState.registerHostName(response.getHostRequestId(), hostName);
+    try {
+      RetryHelper.executeWithRetry(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          persistedState.registerHostName(response.getHostRequestId(), hostName);
+          return null;
+        }
+      });
+    } catch (AmbariException e) {
+      LOG.error("Exception ocurred while registering host name", e);
+      throw new RuntimeException(e);
+    }
+
 
     LOG.info("TopologyManager.processAcceptedHostOffer: about to execute tasks for host = {}",
         hostName);

http://git-wip-us.apache.org/repos/asf/ambari/blob/b3a562c6/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
index 9d2fe9e..877e84d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
@@ -17,19 +17,24 @@
  */
 package org.apache.ambari.server.utils;
 
-import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.state.Cluster;
 import org.eclipse.persistence.exceptions.DatabaseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 /**
  * Provides utility methods to support operations retry
  * TODO injection as Guice singleon, static for now to avoid major modifications
  */
 public class RetryHelper {
+  protected final static Logger LOG = LoggerFactory.getLogger(RetryHelper.class);
+
   private static ThreadLocal<Set<Cluster>> affectedClusters = new ThreadLocal<Set<Cluster>>(){
     @Override
     protected Set<Cluster> initialValue() {
@@ -37,16 +42,14 @@ public class RetryHelper {
     }
   };
 
-  private static int apiRetryAttempts = 0;
-  private static int blueprintsRetryAttempts = 0;
+  private static int operationsRetryAttempts = 0;
 
-  public static void init(int apiOperationsRetryAttempts, int blueprintOperationsRetryAttempts) {
-    apiRetryAttempts = apiOperationsRetryAttempts;
-    blueprintsRetryAttempts = blueprintOperationsRetryAttempts;
+  public static void init(int operationsRetryAttempts) {
+    RetryHelper.operationsRetryAttempts = operationsRetryAttempts;
   }
 
   public static void addAffectedCluster(Cluster cluster) {
-    if (apiRetryAttempts > 0 || blueprintsRetryAttempts > 0) {
+    if (operationsRetryAttempts > 0 ) {
       affectedClusters.get().add(cluster);
     }
   }
@@ -56,13 +59,13 @@ public class RetryHelper {
   }
 
   public static void clearAffectedClusters() {
-    if (apiRetryAttempts > 0 || blueprintsRetryAttempts > 0) {
+    if (operationsRetryAttempts > 0) {
       affectedClusters.get().clear();
     }
   }
 
-  public static int getApiOperationsRetryAttempts() {
-    return apiRetryAttempts;
+  public static int getOperationsRetryAttempts() {
+    return operationsRetryAttempts;
   }
 
   public static boolean isDatabaseException(Throwable ex) {
@@ -82,4 +85,32 @@ public class RetryHelper {
       cluster.invalidateData();
     }
   }
+
+  public static <T> T executeWithRetry(Callable<T> command) throws AmbariException {
+    RetryHelper.clearAffectedClusters();
+    int retryAttempts = RetryHelper.getOperationsRetryAttempts();
+    do {
+
+      try {
+        return command.call();
+      } catch (Exception e) {
+        if (RetryHelper.isDatabaseException(e)) {
+
+          RetryHelper.invalidateAffectedClusters();
+
+          if (retryAttempts > 0) {
+            LOG.error("Ignoring database exception to perform operation retry, attempts remaining: " + retryAttempts, e);
+            retryAttempts--;
+          } else {
+            RetryHelper.clearAffectedClusters();
+            throw new AmbariException(e.getMessage(), e);
+          }
+        } else {
+          RetryHelper.clearAffectedClusters();
+          throw new AmbariException(e.getMessage(), e);
+        }
+      }
+
+    } while (true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/b3a562c6/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java
index f885a5b..84de604 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterResourceProviderTest.java
@@ -40,8 +40,10 @@ import org.apache.ambari.server.topology.SecurityConfiguration;
 import org.apache.ambari.server.topology.SecurityConfigurationFactory;
 import org.apache.ambari.server.topology.TopologyManager;
 import org.apache.ambari.server.topology.TopologyRequestFactory;
+import org.apache.ambari.server.utils.RetryHelper;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.eclipse.persistence.exceptions.DatabaseException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -65,6 +67,7 @@ import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.createStrictMock;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
@@ -280,6 +283,64 @@ public class ClusterResourceProviderTest {
   }
 
   @Test
+  public void testCreateResourcesWithRetry() throws Exception {
+    RetryHelper.init(3);
+    Resource.Type type = Resource.Type.Cluster;
+
+    AmbariManagementController managementController = createMock(AmbariManagementController.class);
+    RequestStatusResponse response = createNiceMock(RequestStatusResponse.class);
+
+    managementController.createCluster(
+        AbstractResourceProviderTest.Matcher.getClusterRequest(null, "Cluster100", "HDP-0.1", null));
+    expectLastCall().andThrow(new DatabaseException("test"){}).once().andVoid().atLeastOnce();
+
+    // replay
+    replay(managementController, response);
+
+    ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
+        type,
+        PropertyHelper.getPropertyIds(type),
+        PropertyHelper.getKeyPropertyIds(type),
+        managementController);
+
+    AbstractResourceProviderTest.TestObserver observer = new AbstractResourceProviderTest.TestObserver();
+
+    ((ObservableResourceProvider)provider).addObserver(observer);
+
+    // add the property map to a set for the request.  add more maps for multiple creates
+    Set<Map<String, Object>> propertySet = new LinkedHashSet<Map<String, Object>>();
+
+    // Cluster 1: create a map of properties for the request
+    Map<String, Object> properties = new LinkedHashMap<String, Object>();
+
+    // add the cluster name to the properties map
+    properties.put(ClusterResourceProvider.CLUSTER_NAME_PROPERTY_ID, "Cluster100");
+
+    // add the version to the properties map
+    properties.put(ClusterResourceProvider.CLUSTER_VERSION_PROPERTY_ID, "HDP-0.1");
+
+    propertySet.add(properties);
+
+    // create the request
+    Request request = PropertyHelper.getCreateRequest(propertySet, null);
+
+    provider.createResources(request);
+
+    ResourceProviderEvent lastEvent = observer.getLastEvent();
+    Assert.assertNotNull(lastEvent);
+    Assert.assertEquals(Resource.Type.Cluster, lastEvent.getResourceType());
+    Assert.assertEquals(ResourceProviderEvent.Type.Create, lastEvent.getType());
+    Assert.assertEquals(request, lastEvent.getRequest());
+    Assert.assertNull(lastEvent.getPredicate());
+
+    // verify
+    verify(managementController, response);
+
+    RetryHelper.init(0);
+
+  }
+
+  @Test
   public void testGetResources() throws Exception{
     Resource.Type type = Resource.Type.Cluster;