You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2015/11/04 14:37:12 UTC

[45/50] [abbrv] ambari git commit: AMBARI-13663. Add support of api operations retry. (mpapirkovskyy)

AMBARI-13663. Add support of api operations retry. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6bc870f6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6bc870f6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6bc870f6

Branch: refs/heads/branch-dev-patch-upgrade
Commit: 6bc870f6c955aec07a19c1b7d222c99a2701d999
Parents: b179d8d
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Wed Nov 4 01:54:35 2015 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Wed Nov 4 02:02:26 2015 +0200

----------------------------------------------------------------------
 .../ambari/server/api/services/BaseService.java |  2 +
 .../server/configuration/Configuration.java     | 43 +++++++++
 .../ambari/server/controller/AmbariServer.java  |  3 +
 .../internal/AbstractResourceProvider.java      | 34 +++++++-
 .../org/apache/ambari/server/state/Cluster.java |  5 ++
 .../server/state/cluster/ClusterImpl.java       | 92 +++++++++++++++++++-
 .../server/state/cluster/ClustersImpl.java      |  3 +-
 .../apache/ambari/server/utils/RetryHelper.java | 85 ++++++++++++++++++
 8 files changed, 261 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java
index 1016ed7..7945599 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java
@@ -28,6 +28,7 @@ import org.apache.ambari.server.api.services.serializers.CsvSerializer;
 import org.apache.ambari.server.api.services.serializers.JsonSerializer;
 import org.apache.ambari.server.api.services.serializers.ResultSerializer;
 import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.utils.RetryHelper;
 import org.eclipse.jetty.util.ajax.JSON;
 
 import javax.ws.rs.core.HttpHeaders;
@@ -117,6 +118,7 @@ public abstract class BaseService {
       builder.type(mediaType);
     }
 
+    RetryHelper.clearAffectedClusters();
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 3a282ed..b4d5de8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -202,6 +202,12 @@ public class Configuration {
   public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_ATTEMPTS = "server.jdbc.connection-pool.acquisition-retry-attempts";
   public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_DELAY = "server.jdbc.connection-pool.acquisition-retry-delay";
 
+  public static final String API_OPERATIONS_RETRY_ATTEMPTS_KEY = "api.operations.retry-attempts";
+  public static final String BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_KEY = "blueprints.operations.retry-attempts";
+  public static final String API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0";
+  public static final String BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0";
+  public static final int RETRY_ATTEMPTS_LIMIT = 10;
+
   public static final String SERVER_JDBC_RCA_USER_NAME_KEY = "server.jdbc.rca.user.name";
   public static final String SERVER_JDBC_RCA_USER_PASSWD_KEY = "server.jdbc.rca.user.passwd";
   public static final String SERVER_JDBC_RCA_DRIVER_KEY = "server.jdbc.rca.driver";
@@ -2351,4 +2357,41 @@ public class Configuration {
   public int getAlertCacheSize() {
     return Integer.parseInt(properties.getProperty(ALERTS_CACHE_SIZE, ALERTS_CACHE_SIZE_DEFAULT));
   }
+
+  /**
+   * @return number of retry attempts for API update requests
+   */
+  public int getApiOperationsRetryAttempts() {
+    String property = properties.getProperty(API_OPERATIONS_RETRY_ATTEMPTS_KEY, API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+    Integer attempts = Integer.valueOf(property);
+    if (attempts < 0) {
+      LOG.warn("Invalid API retry attempts number ({}), should be [0,{}]. Value reset to default {}",
+          attempts, RETRY_ATTEMPTS_LIMIT, API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+      attempts = Integer.valueOf(API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+    } else if (attempts > RETRY_ATTEMPTS_LIMIT) {
+      LOG.warn("Invalid API retry attempts number ({}), should be [0,{}]. Value set to {}",
+          attempts, RETRY_ATTEMPTS_LIMIT, RETRY_ATTEMPTS_LIMIT);
+      attempts = RETRY_ATTEMPTS_LIMIT;
+    }
+    return attempts;
+  }
+
+  /**
+   * @return number of retry attempts for blueprints operations
+   */
+  public int getBlueprintsOperationsRetryAttempts() {
+    String property = properties.getProperty(BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_KEY,
+            BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+    Integer attempts = Integer.valueOf(property);
+    if (attempts < 0) {
+      LOG.warn("Invalid blueprint operations retry attempts number ({}), should be [0,{}]. Value reset to default {}",
+          attempts, RETRY_ATTEMPTS_LIMIT, BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+      attempts = Integer.valueOf(BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+    } else if (attempts > RETRY_ATTEMPTS_LIMIT) {
+      LOG.warn("Invalid blueprint operations retry attempts number ({}), should be [0,{}]. Value set to {}",
+          attempts, RETRY_ATTEMPTS_LIMIT, RETRY_ATTEMPTS_LIMIT);
+      attempts = RETRY_ATTEMPTS_LIMIT;
+    }
+    return attempts;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index 177bbc8..ea178b5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -99,6 +99,7 @@ import org.apache.ambari.server.topology.BlueprintFactory;
 import org.apache.ambari.server.topology.SecurityConfigurationFactory;
 import org.apache.ambari.server.topology.TopologyManager;
 import org.apache.ambari.server.topology.TopologyRequestFactoryImpl;
+import org.apache.ambari.server.utils.RetryHelper;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.ambari.server.view.ViewRegistry;
 import org.apache.velocity.app.Velocity;
@@ -708,6 +709,8 @@ public class AmbariServer {
     ActionManager.setTopologyManager(injector.getInstance(TopologyManager.class));
     TopologyManager.init(injector.getInstance(StackAdvisorBlueprintProcessor.class));
     StackAdvisorBlueprintProcessor.init(injector.getInstance(StackAdvisorHelper.class));
+
+    RetryHelper.init(configs.getApiOperationsRetryAttempts(), configs.getBlueprintsOperationsRetryAttempts());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
index 0c6a07c..2bf8fe3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
@@ -38,6 +38,7 @@ import org.apache.ambari.server.controller.predicate.EqualsPredicate;
 import org.apache.ambari.server.controller.spi.*;
 import org.apache.ambari.server.controller.utilities.PredicateHelper;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.utils.RetryHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -269,7 +270,7 @@ public abstract class AbstractResourceProvider extends BaseProvider implements R
   protected <T> T createResources(Command<T> command)
       throws SystemException, ResourceAlreadyExistsException, NoSuchParentResourceException {
     try {
-      return command.invoke();
+      return invokeWithRetry(command);
     } catch (ParentObjectNotFoundException e) {
       throw new NoSuchParentResourceException(e.getMessage(), e);
     } catch (DuplicateResourceException e) {
@@ -327,7 +328,7 @@ public abstract class AbstractResourceProvider extends BaseProvider implements R
   protected <T> T modifyResources (Command<T> command)
       throws SystemException, NoSuchResourceException, NoSuchParentResourceException {
     try {
-      return command.invoke();
+      return invokeWithRetry(command);
     } catch (ParentObjectNotFoundException e) {
       throw new NoSuchParentResourceException(e.getMessage(), e);
     } catch (ObjectNotFoundException e) {
@@ -439,6 +440,35 @@ public abstract class AbstractResourceProvider extends BaseProvider implements R
     return predicates.size() == 1 && PredicateHelper.getPropertyIds(predicate).containsAll(getPKPropertyIds());
   }
 
+  //invoke command with retry support in case of database fail
+  private <T> T invokeWithRetry(Command<T> command) throws AmbariException {
+    RetryHelper.clearAffectedClusters();
+    int retryAttempts = RetryHelper.getApiOperationsRetryAttempts();
+    do {
+
+      try {
+        return command.invoke();
+      } catch (Exception e) {
+        if (RetryHelper.isDatabaseException(e)) {
+
+          RetryHelper.invalidateAffectedClusters();
+
+          if (retryAttempts > 0) {
+            LOG.error("Ignoring database exception to perform operation retry, attempts remaining: " + retryAttempts, e);
+            retryAttempts--;
+          } else {
+            RetryHelper.clearAffectedClusters();
+            throw e;
+          }
+        } else {
+          RetryHelper.clearAffectedClusters();
+          throw e;
+        }
+      }
+
+    } while (true);
+  }
+
 
   // ----- Inner interface ---------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index f32e552..2afba7e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -608,4 +608,9 @@ public interface Cluster {
    *          {@code null}).
    */
   void removeConfigurations(StackId stackId);
+
+  /**
+   * Clear cluster caches and re-read data from database
+   */
+  void invalidateData();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index ef225b0..6af6a82 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -145,12 +145,14 @@ public class ClusterImpl implements Cluster {
 
   private StackId desiredStackVersion;
 
+  private volatile boolean desiredStackVersionSet = true;
+
   private volatile Map<String, Service> services = null;
 
   /**
    * [ Config Type -> [ Config Version Tag -> Config ] ]
    */
-  private Map<String, Map<String, Config>> allConfigs;
+  private volatile Map<String, Map<String, Config>> allConfigs;
 
   /**
    * [ ServiceName -> [ ServiceComponentName -> [ HostName -> [ ... ] ] ] ]
@@ -918,6 +920,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public StackId getDesiredStackVersion() {
+    loadStackVersion();
     clusterGlobalLock.readLock().lock();
     try {
       return desiredStackVersion;
@@ -1756,6 +1759,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public Map<String, Config> getConfigsByType(String configType) {
+    loadConfigurations();
     clusterGlobalLock.readLock().lock();
     try {
       if (!allConfigs.containsKey(configType)) {
@@ -1770,6 +1774,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public Config getConfig(String configType, String versionTag) {
+    loadConfigurations();
     clusterGlobalLock.readLock().lock();
     try {
       if (!allConfigs.containsKey(configType)
@@ -1801,6 +1806,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public void addConfig(Config config) {
+    loadConfigurations();
     clusterGlobalLock.writeLock().lock();
     try {
       if (config.getType() == null || config.getType().isEmpty()) {
@@ -1818,6 +1824,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public Collection<Config> getAllConfigs() {
+    loadConfigurations();
     clusterGlobalLock.readLock().lock();
     try {
       List<Config> list = new ArrayList<Config>();
@@ -1835,6 +1842,7 @@ public class ClusterImpl implements Cluster {
   @Override
   public ClusterResponse convertToResponse()
     throws AmbariException {
+    loadStackVersion();
     String clusterName = getClusterName();
     Map<String, Host> hosts = clusters.getHostsForCluster(clusterName);
     clusterGlobalLock.readLock().lock();
@@ -1851,6 +1859,7 @@ public class ClusterImpl implements Cluster {
   @Override
   public void debugDump(StringBuilder sb) {
     loadServices();
+    loadStackVersion();
     clusterGlobalLock.readLock().lock();
     try {
       sb.append("Cluster={ clusterName=").append(getClusterName()).append(
@@ -2022,6 +2031,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public Map<String, DesiredConfig> getDesiredConfigs() {
+    loadConfigurations();
     clusterGlobalLock.readLock().lock();
     try {
       Map<String, DesiredConfig> map = new HashMap<String, DesiredConfig>();
@@ -2205,8 +2215,8 @@ public class ClusterImpl implements Cluster {
   }
 
   @Override
-  @RequiresSession
   public List<ServiceConfigVersionResponse> getServiceConfigVersions() {
+    loadConfigurations();
     clusterGlobalLock.readLock().lock();
     try {
       List<ServiceConfigVersionResponse> serviceConfigVersionResponses = new ArrayList<ServiceConfigVersionResponse>();
@@ -2468,6 +2478,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public Config getDesiredConfigByType(String configType) {
+    loadConfigurations();
     clusterGlobalLock.readLock().lock();
     try {
       for (ClusterConfigMappingEntity e : clusterEntity.getConfigMappingEntities()) {
@@ -2873,7 +2884,6 @@ public class ClusterImpl implements Cluster {
    * {@inheritDoc}
    */
   @Override
-  @Transactional
   public void applyLatestConfigurations(StackId stackId) {
     clusterGlobalLock.writeLock().lock();
     try {
@@ -3036,4 +3046,80 @@ public class ClusterImpl implements Cluster {
       }
     }
   }
+
+  private void loadConfigurations() {
+    if (allConfigs != null) {
+      return;
+    }
+    clusterGlobalLock.writeLock().lock();
+    try {
+      if (allConfigs != null) {
+        return;
+      }
+      cacheConfigurations();
+
+    } finally {
+      clusterGlobalLock.writeLock().unlock();
+    }
+  }
+
+  private void loadStackVersion() {
+    if (desiredStackVersionSet) {
+      return;
+    }
+    clusterGlobalLock.writeLock().lock();
+    try {
+
+      if (desiredStackVersionSet) {
+        return;
+      }
+
+      desiredStackVersion = new StackId(clusterEntity.getDesiredStack());
+
+      if (!StringUtils.isEmpty(desiredStackVersion.getStackName()) && !
+          StringUtils.isEmpty(desiredStackVersion.getStackVersion())) {
+        try {
+          loadServiceConfigTypes();
+        } catch (AmbariException e) {
+          //TODO recheck wrapping exception here, required for lazy loading after invalidation
+          throw new RuntimeException(e);
+        }
+      }
+
+      desiredStackVersionSet = true;
+
+    } finally {
+      clusterGlobalLock.writeLock().unlock();
+    }
+
+  }
+
+  /**
+   * Purpose of this method is to clear all cached data to re-read it from database.
+   * To be used in case of desync.
+   */
+  @Override
+  public void invalidateData() {
+    clusterGlobalLock.writeLock().lock();
+    try {
+      allConfigs = null;
+      services = null;
+      desiredStackVersionSet = false;
+
+      serviceComponentHosts.clear();
+      serviceComponentHostsByHost.clear();
+      svcHostsLoaded = false;
+
+      clusterConfigGroups = null;
+
+      //TODO investigate reset request executions, it has separate api which is not too heavy
+
+      refresh();
+
+    } finally {
+      clusterGlobalLock.writeLock().unlock();
+    }
+  }
 }
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
index a89fb91..9ea9581 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
@@ -66,6 +66,7 @@ import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
 import org.apache.ambari.server.state.host.HostFactory;
+import org.apache.ambari.server.utils.RetryHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.core.GrantedAuthority;
@@ -272,7 +273,7 @@ public class ClustersImpl implements Clusters {
     if (null == cluster) {
       throw new ClusterNotFoundException(clusterName);
     }
-
+    RetryHelper.addAffectedCluster(cluster);
     return cluster;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
new file mode 100644
index 0000000..9d2fe9e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.state.Cluster;
+import org.eclipse.persistence.exceptions.DatabaseException;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Provides utility methods to support operations retry
+ * TODO injection as Guice singleon, static for now to avoid major modifications
+ */
+public class RetryHelper {
+  private static ThreadLocal<Set<Cluster>> affectedClusters = new ThreadLocal<Set<Cluster>>(){
+    @Override
+    protected Set<Cluster> initialValue() {
+      return new HashSet<>();
+    }
+  };
+
+  private static int apiRetryAttempts = 0;
+  private static int blueprintsRetryAttempts = 0;
+
+  public static void init(int apiOperationsRetryAttempts, int blueprintOperationsRetryAttempts) {
+    apiRetryAttempts = apiOperationsRetryAttempts;
+    blueprintsRetryAttempts = blueprintOperationsRetryAttempts;
+  }
+
+  public static void addAffectedCluster(Cluster cluster) {
+    if (apiRetryAttempts > 0 || blueprintsRetryAttempts > 0) {
+      affectedClusters.get().add(cluster);
+    }
+  }
+
+  public static Set<Cluster> getAffectedClusters() {
+    return Collections.unmodifiableSet(affectedClusters.get());
+  }
+
+  public static void clearAffectedClusters() {
+    if (apiRetryAttempts > 0 || blueprintsRetryAttempts > 0) {
+      affectedClusters.get().clear();
+    }
+  }
+
+  public static int getApiOperationsRetryAttempts() {
+    return apiRetryAttempts;
+  }
+
+  public static boolean isDatabaseException(Throwable ex) {
+    do {
+      if (ex instanceof DatabaseException) {
+        return true;
+      }
+      ex = ex.getCause();
+
+    } while (ex != null);
+
+    return false;
+  }
+
+  public static void invalidateAffectedClusters() {
+    for (Cluster cluster : affectedClusters.get()) {
+      cluster.invalidateData();
+    }
+  }
+}