You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2015/11/04 14:37:12 UTC
[45/50] [abbrv] ambari git commit: AMBARI-13663. Add support of api
operations retry. (mpapirkovskyy)
AMBARI-13663. Add support of api operations retry. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6bc870f6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6bc870f6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6bc870f6
Branch: refs/heads/branch-dev-patch-upgrade
Commit: 6bc870f6c955aec07a19c1b7d222c99a2701d999
Parents: b179d8d
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Wed Nov 4 01:54:35 2015 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Wed Nov 4 02:02:26 2015 +0200
----------------------------------------------------------------------
.../ambari/server/api/services/BaseService.java | 2 +
.../server/configuration/Configuration.java | 43 +++++++++
.../ambari/server/controller/AmbariServer.java | 3 +
.../internal/AbstractResourceProvider.java | 34 +++++++-
.../org/apache/ambari/server/state/Cluster.java | 5 ++
.../server/state/cluster/ClusterImpl.java | 92 +++++++++++++++++++-
.../server/state/cluster/ClustersImpl.java | 3 +-
.../apache/ambari/server/utils/RetryHelper.java | 85 ++++++++++++++++++
8 files changed, 261 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java
index 1016ed7..7945599 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/BaseService.java
@@ -28,6 +28,7 @@ import org.apache.ambari.server.api.services.serializers.CsvSerializer;
import org.apache.ambari.server.api.services.serializers.JsonSerializer;
import org.apache.ambari.server.api.services.serializers.ResultSerializer;
import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.utils.RetryHelper;
import org.eclipse.jetty.util.ajax.JSON;
import javax.ws.rs.core.HttpHeaders;
@@ -117,6 +118,7 @@ public abstract class BaseService {
builder.type(mediaType);
}
+ RetryHelper.clearAffectedClusters();
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 3a282ed..b4d5de8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -202,6 +202,12 @@ public class Configuration {
public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_ATTEMPTS = "server.jdbc.connection-pool.acquisition-retry-attempts";
public static final String SERVER_JDBC_CONNECTION_POOL_ACQUISITION_RETRY_DELAY = "server.jdbc.connection-pool.acquisition-retry-delay";
+ public static final String API_OPERATIONS_RETRY_ATTEMPTS_KEY = "api.operations.retry-attempts";
+ public static final String BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_KEY = "blueprints.operations.retry-attempts";
+ public static final String API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0";
+ public static final String BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT = "0";
+ public static final int RETRY_ATTEMPTS_LIMIT = 10;
+
public static final String SERVER_JDBC_RCA_USER_NAME_KEY = "server.jdbc.rca.user.name";
public static final String SERVER_JDBC_RCA_USER_PASSWD_KEY = "server.jdbc.rca.user.passwd";
public static final String SERVER_JDBC_RCA_DRIVER_KEY = "server.jdbc.rca.driver";
@@ -2351,4 +2357,41 @@ public class Configuration {
public int getAlertCacheSize() {
return Integer.parseInt(properties.getProperty(ALERTS_CACHE_SIZE, ALERTS_CACHE_SIZE_DEFAULT));
}
+
+ /**
+ * @return number of retry attempts for API update requests
+ */
+ public int getApiOperationsRetryAttempts() {
+ String property = properties.getProperty(API_OPERATIONS_RETRY_ATTEMPTS_KEY, API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+ Integer attempts = Integer.valueOf(property);
+ if (attempts < 0) {
+ LOG.warn("Invalid API retry attempts number ({}), should be [0,{}]. Value reset to default {}",
+ attempts, RETRY_ATTEMPTS_LIMIT, API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+ attempts = Integer.valueOf(API_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+ } else if (attempts > RETRY_ATTEMPTS_LIMIT) {
+ LOG.warn("Invalid API retry attempts number ({}), should be [0,{}]. Value set to {}",
+ attempts, RETRY_ATTEMPTS_LIMIT, RETRY_ATTEMPTS_LIMIT);
+ attempts = RETRY_ATTEMPTS_LIMIT;
+ }
+ return attempts;
+ }
+
+ /**
+ * @return number of retry attempts for blueprints operations
+ */
+ public int getBlueprintsOperationsRetryAttempts() {
+ String property = properties.getProperty(BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_KEY,
+ BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+ Integer attempts = Integer.valueOf(property);
+ if (attempts < 0) {
+ LOG.warn("Invalid blueprint operations retry attempts number ({}), should be [0,{}]. Value reset to default {}",
+ attempts, RETRY_ATTEMPTS_LIMIT, BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+ attempts = Integer.valueOf(BLUEPRINTS_OPERATIONS_RETRY_ATTEMPTS_DEFAULT);
+ } else if (attempts > RETRY_ATTEMPTS_LIMIT) {
+ LOG.warn("Invalid blueprint operations retry attempts number ({}), should be [0,{}]. Value set to {}",
+ attempts, RETRY_ATTEMPTS_LIMIT, RETRY_ATTEMPTS_LIMIT);
+ attempts = RETRY_ATTEMPTS_LIMIT;
+ }
+ return attempts;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index 177bbc8..ea178b5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -99,6 +99,7 @@ import org.apache.ambari.server.topology.BlueprintFactory;
import org.apache.ambari.server.topology.SecurityConfigurationFactory;
import org.apache.ambari.server.topology.TopologyManager;
import org.apache.ambari.server.topology.TopologyRequestFactoryImpl;
+import org.apache.ambari.server.utils.RetryHelper;
import org.apache.ambari.server.utils.StageUtils;
import org.apache.ambari.server.view.ViewRegistry;
import org.apache.velocity.app.Velocity;
@@ -708,6 +709,8 @@ public class AmbariServer {
ActionManager.setTopologyManager(injector.getInstance(TopologyManager.class));
TopologyManager.init(injector.getInstance(StackAdvisorBlueprintProcessor.class));
StackAdvisorBlueprintProcessor.init(injector.getInstance(StackAdvisorHelper.class));
+
+ RetryHelper.init(configs.getApiOperationsRetryAttempts(), configs.getBlueprintsOperationsRetryAttempts());
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
index 0c6a07c..2bf8fe3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractResourceProvider.java
@@ -38,6 +38,7 @@ import org.apache.ambari.server.controller.predicate.EqualsPredicate;
import org.apache.ambari.server.controller.spi.*;
import org.apache.ambari.server.controller.utilities.PredicateHelper;
import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.utils.RetryHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -269,7 +270,7 @@ public abstract class AbstractResourceProvider extends BaseProvider implements R
protected <T> T createResources(Command<T> command)
throws SystemException, ResourceAlreadyExistsException, NoSuchParentResourceException {
try {
- return command.invoke();
+ return invokeWithRetry(command);
} catch (ParentObjectNotFoundException e) {
throw new NoSuchParentResourceException(e.getMessage(), e);
} catch (DuplicateResourceException e) {
@@ -327,7 +328,7 @@ public abstract class AbstractResourceProvider extends BaseProvider implements R
protected <T> T modifyResources (Command<T> command)
throws SystemException, NoSuchResourceException, NoSuchParentResourceException {
try {
- return command.invoke();
+ return invokeWithRetry(command);
} catch (ParentObjectNotFoundException e) {
throw new NoSuchParentResourceException(e.getMessage(), e);
} catch (ObjectNotFoundException e) {
@@ -439,6 +440,35 @@ public abstract class AbstractResourceProvider extends BaseProvider implements R
return predicates.size() == 1 && PredicateHelper.getPropertyIds(predicate).containsAll(getPKPropertyIds());
}
+ //invoke command with retry support in case of database fail
+ private <T> T invokeWithRetry(Command<T> command) throws AmbariException {
+ RetryHelper.clearAffectedClusters();
+ int retryAttempts = RetryHelper.getApiOperationsRetryAttempts();
+ do {
+
+ try {
+ return command.invoke();
+ } catch (Exception e) {
+ if (RetryHelper.isDatabaseException(e)) {
+
+ RetryHelper.invalidateAffectedClusters();
+
+ if (retryAttempts > 0) {
+ LOG.error("Ignoring database exception to perform operation retry, attempts remaining: " + retryAttempts, e);
+ retryAttempts--;
+ } else {
+ RetryHelper.clearAffectedClusters();
+ throw e;
+ }
+ } else {
+ RetryHelper.clearAffectedClusters();
+ throw e;
+ }
+ }
+
+ } while (true);
+ }
+
// ----- Inner interface ---------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index f32e552..2afba7e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -608,4 +608,9 @@ public interface Cluster {
* {@code null}).
*/
void removeConfigurations(StackId stackId);
+
+ /**
+ * Clear cluster caches and re-read data from database
+ */
+ void invalidateData();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index ef225b0..6af6a82 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -145,12 +145,14 @@ public class ClusterImpl implements Cluster {
private StackId desiredStackVersion;
+ private volatile boolean desiredStackVersionSet = true;
+
private volatile Map<String, Service> services = null;
/**
* [ Config Type -> [ Config Version Tag -> Config ] ]
*/
- private Map<String, Map<String, Config>> allConfigs;
+ private volatile Map<String, Map<String, Config>> allConfigs;
/**
* [ ServiceName -> [ ServiceComponentName -> [ HostName -> [ ... ] ] ] ]
@@ -918,6 +920,7 @@ public class ClusterImpl implements Cluster {
@Override
public StackId getDesiredStackVersion() {
+ loadStackVersion();
clusterGlobalLock.readLock().lock();
try {
return desiredStackVersion;
@@ -1756,6 +1759,7 @@ public class ClusterImpl implements Cluster {
@Override
public Map<String, Config> getConfigsByType(String configType) {
+ loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
if (!allConfigs.containsKey(configType)) {
@@ -1770,6 +1774,7 @@ public class ClusterImpl implements Cluster {
@Override
public Config getConfig(String configType, String versionTag) {
+ loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
if (!allConfigs.containsKey(configType)
@@ -1801,6 +1806,7 @@ public class ClusterImpl implements Cluster {
@Override
public void addConfig(Config config) {
+ loadConfigurations();
clusterGlobalLock.writeLock().lock();
try {
if (config.getType() == null || config.getType().isEmpty()) {
@@ -1818,6 +1824,7 @@ public class ClusterImpl implements Cluster {
@Override
public Collection<Config> getAllConfigs() {
+ loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
List<Config> list = new ArrayList<Config>();
@@ -1835,6 +1842,7 @@ public class ClusterImpl implements Cluster {
@Override
public ClusterResponse convertToResponse()
throws AmbariException {
+ loadStackVersion();
String clusterName = getClusterName();
Map<String, Host> hosts = clusters.getHostsForCluster(clusterName);
clusterGlobalLock.readLock().lock();
@@ -1851,6 +1859,7 @@ public class ClusterImpl implements Cluster {
@Override
public void debugDump(StringBuilder sb) {
loadServices();
+ loadStackVersion();
clusterGlobalLock.readLock().lock();
try {
sb.append("Cluster={ clusterName=").append(getClusterName()).append(
@@ -2022,6 +2031,7 @@ public class ClusterImpl implements Cluster {
@Override
public Map<String, DesiredConfig> getDesiredConfigs() {
+ loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
Map<String, DesiredConfig> map = new HashMap<String, DesiredConfig>();
@@ -2205,8 +2215,8 @@ public class ClusterImpl implements Cluster {
}
@Override
- @RequiresSession
public List<ServiceConfigVersionResponse> getServiceConfigVersions() {
+ loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
List<ServiceConfigVersionResponse> serviceConfigVersionResponses = new ArrayList<ServiceConfigVersionResponse>();
@@ -2468,6 +2478,7 @@ public class ClusterImpl implements Cluster {
@Override
public Config getDesiredConfigByType(String configType) {
+ loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
for (ClusterConfigMappingEntity e : clusterEntity.getConfigMappingEntities()) {
@@ -2873,7 +2884,6 @@ public class ClusterImpl implements Cluster {
* {@inheritDoc}
*/
@Override
- @Transactional
public void applyLatestConfigurations(StackId stackId) {
clusterGlobalLock.writeLock().lock();
try {
@@ -3036,4 +3046,80 @@ public class ClusterImpl implements Cluster {
}
}
}
+
+ private void loadConfigurations() {
+ if (allConfigs != null) {
+ return;
+ }
+ clusterGlobalLock.writeLock().lock();
+ try {
+ if (allConfigs != null) {
+ return;
+ }
+ cacheConfigurations();
+
+ } finally {
+ clusterGlobalLock.writeLock().unlock();
+ }
+ }
+
+ private void loadStackVersion() {
+ if (desiredStackVersionSet) {
+ return;
+ }
+ clusterGlobalLock.writeLock().lock();
+ try {
+
+ if (desiredStackVersionSet) {
+ return;
+ }
+
+ desiredStackVersion = new StackId(clusterEntity.getDesiredStack());
+
+ if (!StringUtils.isEmpty(desiredStackVersion.getStackName()) && !
+ StringUtils.isEmpty(desiredStackVersion.getStackVersion())) {
+ try {
+ loadServiceConfigTypes();
+ } catch (AmbariException e) {
+ //TODO recheck wrapping exception here, required for lazy loading after invalidation
+ throw new RuntimeException(e);
+ }
+ }
+
+ desiredStackVersionSet = true;
+
+ } finally {
+ clusterGlobalLock.writeLock().unlock();
+ }
+
+ }
+
+ /**
+ * Purpose of this method is to clear all cached data to re-read it from database.
+ * To be used in case of desync.
+ */
+ @Override
+ public void invalidateData() {
+ clusterGlobalLock.writeLock().lock();
+ try {
+ allConfigs = null;
+ services = null;
+ desiredStackVersionSet = false;
+
+ serviceComponentHosts.clear();
+ serviceComponentHostsByHost.clear();
+ svcHostsLoaded = false;
+
+ clusterConfigGroups = null;
+
+ //TODO investigate reset request executions, it has separate api which is not too heavy
+
+ refresh();
+
+ } finally {
+ clusterGlobalLock.writeLock().unlock();
+ }
+ }
}
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
index a89fb91..9ea9581 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
@@ -66,6 +66,7 @@ import org.apache.ambari.server.state.SecurityType;
import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.configgroup.ConfigGroup;
import org.apache.ambari.server.state.host.HostFactory;
+import org.apache.ambari.server.utils.RetryHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.core.GrantedAuthority;
@@ -272,7 +273,7 @@ public class ClustersImpl implements Clusters {
if (null == cluster) {
throw new ClusterNotFoundException(clusterName);
}
-
+ RetryHelper.addAffectedCluster(cluster);
return cluster;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bc870f6/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
new file mode 100644
index 0000000..9d2fe9e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.state.Cluster;
+import org.eclipse.persistence.exceptions.DatabaseException;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Provides utility methods to support operations retry
+ * TODO injection as Guice singleon, static for now to avoid major modifications
+ */
+public class RetryHelper {
+ private static ThreadLocal<Set<Cluster>> affectedClusters = new ThreadLocal<Set<Cluster>>(){
+ @Override
+ protected Set<Cluster> initialValue() {
+ return new HashSet<>();
+ }
+ };
+
+ private static int apiRetryAttempts = 0;
+ private static int blueprintsRetryAttempts = 0;
+
+ public static void init(int apiOperationsRetryAttempts, int blueprintOperationsRetryAttempts) {
+ apiRetryAttempts = apiOperationsRetryAttempts;
+ blueprintsRetryAttempts = blueprintOperationsRetryAttempts;
+ }
+
+ public static void addAffectedCluster(Cluster cluster) {
+ if (apiRetryAttempts > 0 || blueprintsRetryAttempts > 0) {
+ affectedClusters.get().add(cluster);
+ }
+ }
+
+ public static Set<Cluster> getAffectedClusters() {
+ return Collections.unmodifiableSet(affectedClusters.get());
+ }
+
+ public static void clearAffectedClusters() {
+ if (apiRetryAttempts > 0 || blueprintsRetryAttempts > 0) {
+ affectedClusters.get().clear();
+ }
+ }
+
+ public static int getApiOperationsRetryAttempts() {
+ return apiRetryAttempts;
+ }
+
+ public static boolean isDatabaseException(Throwable ex) {
+ do {
+ if (ex instanceof DatabaseException) {
+ return true;
+ }
+ ex = ex.getCause();
+
+ } while (ex != null);
+
+ return false;
+ }
+
+ public static void invalidateAffectedClusters() {
+ for (Cluster cluster : affectedClusters.get()) {
+ cluster.invalidateData();
+ }
+ }
+}