You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2017/06/29 20:59:25 UTC
ambari git commit: AMBARI-21361 - Finalization Can Fail When Host
Versions Changed on Stack Distribution (jonathanhurley)
Repository: ambari
Updated Branches:
refs/heads/branch-feature-AMBARI-21348 98f77089c -> 6bab5a537
AMBARI-21361 - Finalization Can Fail When Host Versions Changed on Stack Distribution (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6bab5a53
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6bab5a53
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6bab5a53
Branch: refs/heads/branch-feature-AMBARI-21348
Commit: 6bab5a53789d0c50cc75b1e1397cabcc0630f385
Parents: 98f7708
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Thu Jun 29 09:26:33 2017 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Thu Jun 29 16:53:51 2017 -0400
----------------------------------------------------------------------
.../actionmanager/ExecutionCommandWrapper.java | 32 ++-
.../AmbariCustomCommandExecutionHelper.java | 4 -
.../internal/UpgradeResourceProvider.java | 23 --
.../listeners/upgrade/StackVersionListener.java | 227 +++++++++++--------
4 files changed, 159 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bab5a53/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
index 4773c75..fc66f53 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
@@ -17,6 +17,9 @@
*/
package org.apache.ambari.server.actionmanager;
+import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.HOOKS_FOLDER;
+import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.SERVICE_PACKAGE_FOLDER;
+
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -27,12 +30,17 @@ import org.apache.ambari.server.ClusterNotFoundException;
import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.agent.ExecutionCommand.KeyNames;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.entities.ClusterVersionEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ConfigHelper;
import org.apache.ambari.server.state.DesiredConfig;
+import org.apache.ambari.server.state.ServiceInfo;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +67,12 @@ public class ExecutionCommandWrapper {
@Inject
private Gson gson;
+ /**
+ * Used for injecting hooks and common-services into the command.
+ */
+ @Inject
+ private AmbariMetaInfo ambariMetaInfo;
+
@AssistedInject
public ExecutionCommandWrapper(@Assisted String jsonExecutionCommand) {
this.jsonExecutionCommand = jsonExecutionCommand;
@@ -182,12 +196,28 @@ public class ExecutionCommandWrapper {
}
}
+ Map<String,String> commandParams = executionCommand.getCommandParams();
+
ClusterVersionEntity effectiveClusterVersion = cluster.getEffectiveClusterVersion();
if (null != effectiveClusterVersion) {
- executionCommand.getCommandParams().put(KeyNames.VERSION,
+ commandParams.put(KeyNames.VERSION,
effectiveClusterVersion.getRepositoryVersion().getVersion());
}
+ // add the stack and common-services folders to the command
+ StackId stackId = cluster.getDesiredStackVersion();
+ StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(),
+ stackId.getStackVersion());
+
+ commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
+
+ String serviceName = executionCommand.getServiceName();
+ if (!StringUtils.isEmpty(serviceName)) {
+ ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(),
+ stackId.getStackVersion(), serviceName);
+
+ commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder());
+ }
} catch (ClusterNotFoundException cnfe) {
// it's possible that there are commands without clusters; in such cases,
// just return the de-serialized command and don't try to read configs
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bab5a53/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
index 3a672b6..6f92707 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -329,8 +329,6 @@ public class AmbariCustomCommandExecutionHelper {
AmbariMetaInfo ambariMetaInfo = managementController.getAmbariMetaInfo();
ServiceInfo serviceInfo = ambariMetaInfo.getService(
stackId.getStackName(), stackId.getStackVersion(), serviceName);
- StackInfo stackInfo = ambariMetaInfo.getStack
- (stackId.getStackName(), stackId.getStackVersion());
CustomCommandDefinition customCommandDefinition = null;
ComponentInfo ci = serviceInfo.getComponentByName(componentName);
@@ -474,8 +472,6 @@ public class AmbariCustomCommandExecutionHelper {
}
commandParams.put(COMMAND_TIMEOUT, "" + commandTimeout);
- commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder());
- commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
Map<String, String> roleParams = execCmd.getRoleParams();
if (roleParams == null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bab5a53/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
index d2573e1..822f94d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
@@ -17,9 +17,6 @@
*/
package org.apache.ambari.server.controller.internal;
-import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.HOOKS_FOLDER;
-import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.SERVICE_PACKAGE_FOLDER;
-
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -95,7 +92,6 @@ import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceInfo;
import org.apache.ambari.server.state.StackId;
-import org.apache.ambari.server.state.StackInfo;
import org.apache.ambari.server.state.UpgradeContext;
import org.apache.ambari.server.state.UpgradeContextFactory;
import org.apache.ambari.server.state.UpgradeHelper;
@@ -1312,25 +1308,6 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
// Apply additional parameters to the command that come from the stage.
applyAdditionalParameters(wrapper, params);
- // Because custom task may end up calling a script/function inside a
- // service, it is necessary to set the
- // service_package_folder and hooks_folder params.
- AmbariMetaInfo ambariMetaInfo = s_metaProvider.get();
- StackId stackId = context.getEffectiveStackId();
-
- StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(),
- stackId.getStackVersion());
-
- if (wrapper.getTasks() != null && wrapper.getTasks().size() > 0
- && wrapper.getTasks().get(0).getService() != null) {
- String serviceName = wrapper.getTasks().get(0).getService();
- ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(),
- stackId.getStackVersion(), serviceName);
-
- params.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder());
- params.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
- }
-
ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(),
"ru_execute_tasks", Collections.singletonList(filter), params);
http://git-wip-us.apache.org/repos/asf/ambari/blob/6bab5a53/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
index 22d7f2e..4600912 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,18 +17,15 @@
*/
package org.apache.ambari.server.events.listeners.upgrade;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.EagerSingleton;
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.events.HostComponentVersionAdvertisedEvent;
import org.apache.ambari.server.events.publishers.VersionEventPublisher;
import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.entities.ClusterVersionEntity;
+import org.apache.ambari.server.orm.entities.HostVersionEntity;
import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.State;
@@ -37,19 +34,15 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
-import com.google.inject.Provider;
import com.google.inject.Singleton;
/**
* The {@link StackVersionListener} class handles the propagation of versions
* advertised by the {@link org.apache.ambari.server.state.ServiceComponentHost}
* that bubble up to the
- * {@link org.apache.ambari.server.orm.entities.HostVersionEntity} and
- * eventually the
- * {@link org.apache.ambari.server.orm.entities.ClusterVersionEntity}
+ * {@link org.apache.ambari.server.orm.entities.HostVersionEntity}.
*/
@Singleton
@EagerSingleton
@@ -60,18 +53,9 @@ public class StackVersionListener {
private final static Logger LOG = LoggerFactory.getLogger(StackVersionListener.class);
public static final String UNKNOWN_VERSION = State.UNKNOWN.toString();
- /**
- * Used to prevent multiple threads from trying to create host alerts
- * simultaneously.
- */
- private Lock m_stackVersionLock = new ReentrantLock();
-
@Inject
private RepositoryVersionDAO repositoryVersionDAO;
- @Inject
- Provider<AmbariMetaInfo> ambariMetaInfo;
-
/**
* Constructor.
*
@@ -83,7 +67,6 @@ public class StackVersionListener {
}
@Subscribe
- @AllowConcurrentEvents
public void onAmbariEvent(HostComponentVersionAdvertisedEvent event) {
LOG.debug("Received event {}", event);
@@ -96,8 +79,6 @@ public class StackVersionListener {
return;
}
- m_stackVersionLock.lock();
-
// if the cluster is upgrading, there's no need to update the repo version -
// it better be right
if (null != event.getRepositoryVersionId() && null == cluster.getUpgradeInProgress()) {
@@ -116,70 +97,113 @@ public class StackVersionListener {
// Update host component version value if needed
try {
- AmbariMetaInfo metaInfo = ambariMetaInfo.get();
- ComponentInfo componentInfo = metaInfo.getComponent(cluster.getDesiredStackVersion().getStackName(),
- cluster.getDesiredStackVersion().getStackVersion(), sch.getServiceName(), sch.getServiceComponentName());
- ServiceComponent sc = cluster.getService(sch.getServiceName()).getServiceComponent(sch.getServiceComponentName());
- if (componentInfo.isVersionAdvertised() && StringUtils.isNotBlank(newVersion)
- && !UNKNOWN_VERSION.equalsIgnoreCase(newVersion)) {
- processComponentAdvertisedVersion(cluster, sch, newVersion, sc);
- } else if(!sc.isVersionAdvertised() && StringUtils.isNotBlank(newVersion)
- && !UNKNOWN_VERSION.equalsIgnoreCase(newVersion)) {
- LOG.debug("ServiceComponent {} doesn't advertise version, " +
- "however ServiceHostComponent {} on host {} advertised version as {}. Skipping version update",
- sc.getName(), sch.getServiceComponentName(), sch.getHostName(), newVersion);
- } else {
- if (UNKNOWN_VERSION.equals(sc.getDesiredVersion())) {
- processUnknownDesiredVersion(cluster, sc, sch, newVersion);
- } else {
- processComponentAdvertisedVersion(cluster, sch, newVersion, sc);
+ ServiceComponent sc = cluster.getService(sch.getServiceName()).getServiceComponent(
+ sch.getServiceComponentName());
+
+ // not advertising a version, do nothing
+ if (!sc.isVersionAdvertised()) {
+ // that's odd; a version came back - log it and still do nothing
+ if (!StringUtils.equalsIgnoreCase(UNKNOWN_VERSION, newVersion)) {
+ LOG.debug(
+ "ServiceComponent {} doesn't advertise version, however ServiceHostComponent {} on host {} advertised version as {}. Skipping version update",
+ sc.getName(), sch.getServiceComponentName(), sch.getHostName(), newVersion);
}
+ return;
}
+
+ boolean desiredVersionIsCurrentlyUnknown = StringUtils.equalsIgnoreCase(UNKNOWN_VERSION,
+ sc.getDesiredVersion());
+
+ // proces the UNKNOWN version being received or currently desired
+ if (StringUtils.equalsIgnoreCase(UNKNOWN_VERSION, newVersion)
+ || desiredVersionIsCurrentlyUnknown) {
+ processUnknownDesiredVersion(cluster, sc, sch, newVersion);
+ return;
+ }
+
+ processComponentAdvertisedVersion(cluster, sc, sch, newVersion);
} catch (Exception e) {
LOG.error(
"Unable to propagate version for ServiceHostComponent on component: {}, host: {}. Error: {}",
sch.getServiceComponentName(), sch.getHostName(), e.getMessage());
- } finally {
- m_stackVersionLock.unlock();
}
}
+
/**
- * Update host component version
- * or
- * Bootstrap cluster/repo version when version is reported for the first time
- * @param cluster target cluster
- * @param sch target host component
- * @param newVersion advertised version
- * @param sc target service component
+ * Updates the version and {@link UpgradeState} for the specified
+ * {@link ServiceComponentHost} if necessary. If the version or the upgrade
+ * state changes, then this method will call
+ * {@link ServiceComponentHost#recalculateHostVersionState()} in order to
+ * ensure that the host version state is properly updated.
+ * <p/>
+ *
+ *
+ * @param cluster
+ * @param sc
+ * @param sch
+ * @param newVersion
* @throws AmbariException
*/
- private void processComponentAdvertisedVersion(Cluster cluster, ServiceComponentHost sch, String newVersion, ServiceComponent sc) throws AmbariException {
+ private void processComponentAdvertisedVersion(Cluster cluster, ServiceComponent sc,
+ ServiceComponentHost sch, String newVersion) throws AmbariException {
if (StringUtils.isBlank(newVersion)) {
return;
}
+
String previousVersion = sch.getVersion();
- if (previousVersion == null || UNKNOWN_VERSION.equalsIgnoreCase(previousVersion)) {
- // value may be "UNKNOWN" when upgrading from older Ambari versions
- // or if host component reports it's version for the first time
- sch.setUpgradeState(UpgradeState.NONE);
+ String desiredVersion = sc.getDesiredVersion();
+ UpgradeState upgradeState = sch.getUpgradeState();
+
+ // was this version expected
+ boolean newVersionMatchesDesired = StringUtils.equals(desiredVersion, newVersion);
+
+ // was the prior version UNKNOWN
+ boolean previousVersionIsUnknown = StringUtils.equalsIgnoreCase(UNKNOWN_VERSION, previousVersion);
+
+ boolean desiredVersionIsUnknown = StringUtils.equalsIgnoreCase(UNKNOWN_VERSION, desiredVersion);
+
+ // is there an upgrade in progress for this component
+ boolean isUpgradeInProgressForThisComponent = null != cluster.getUpgradeInProgress()
+ && upgradeState != UpgradeState.NONE;
+
+ // if the current version is an actual value (ie 2.2.0.0-1234 and not
+ // UNKNOWN), and the newly received version is unexpected, and we are not in
+ // an upgrade - then we really should not be changing the reported version
+ if (!previousVersionIsUnknown && !desiredVersionIsUnknown && !newVersionMatchesDesired
+ && !isUpgradeInProgressForThisComponent) {
+ LOG.warn(
+ "Received a reported version of {} for {} on {}. This was not expected since the desired version is {} and the cluster is not upgrading this component. The version will not be changed.",
+ newVersion, sc.getName(), sch.getHostName(), desiredVersion);
+
+ return;
+ }
+
+ // update the SCH to the new version reported
+ if (!StringUtils.equals(previousVersion, newVersion)) {
sch.setVersion(newVersion);
- bootstrapVersion(cluster, sch);
- } else if (!StringUtils.equals(previousVersion, newVersion)) {
- processComponentVersionChange(cluster, sc, sch, newVersion);
}
- }
- /**
- * Bootstrap cluster/repo version when version is reported for the first time
- * @param cluster target cluster
- * @param sch target host component
- * @throws AmbariException
- */
- private void bootstrapVersion(Cluster cluster, ServiceComponentHost sch) throws AmbariException {
- RepositoryVersionEntity repoVersion = sch.recalculateHostVersionState();
- if (null != repoVersion) {
- cluster.recalculateClusterVersionState(repoVersion);
+ if (previousVersion == null || previousVersionIsUnknown) {
+ // value may be "UNKNOWN" when upgrading from older Ambari versions
+ // or if host component reports it's version for the first time
+ sch.setUpgradeState(UpgradeState.NONE);
+ recalculateHostVersionAndClusterVersion(cluster, sch);
+ } else {
+ if (newVersionMatchesDesired) {
+ if (isUpgradeInProgressForThisComponent) {
+ sch.setStackVersion(cluster.getDesiredStackVersion());
+ setUpgradeStateAndRecalculateHostVersions(cluster, sch, UpgradeState.COMPLETE);
+ } else {
+ // no upgrade in progress for this component, then this should always
+ // be NONE
+ setUpgradeStateAndRecalculateHostVersions(cluster, sch, UpgradeState.NONE);
+ }
+ } else {
+ // if the versions don't match for any reason, regardless of upgrade
+ // state, then VERSION_MISMATCH it
+ setUpgradeStateAndRecalculateHostVersions(cluster, sch, UpgradeState.VERSION_MISMATCH);
+ }
}
}
@@ -197,40 +221,45 @@ public class StackVersionListener {
sc.setDesiredVersion(newVersion);
sch.setUpgradeState(UpgradeState.NONE);
sch.setVersion(newVersion);
- bootstrapVersion(cluster, sch);
+
+ recalculateHostVersionAndClusterVersion(cluster, sch);
}
/**
- * Focuses on cases when host component version really changed
- * @param cluster target cluster
- * @param sc target service component
- * @param sch target host component
- * @param newVersion advertised version
+ * @param sch
+ * @param upgradeState
+ * @throws AmbariException
*/
- private void processComponentVersionChange(Cluster cluster, ServiceComponent sc,
- ServiceComponentHost sch,
- String newVersion) {
- String desiredVersion = sc.getDesiredVersion();
- UpgradeState upgradeState = sch.getUpgradeState();
- if (upgradeState == UpgradeState.IN_PROGRESS) {
- // Component status update is received during upgrade process
- if (desiredVersion.equals(newVersion)) {
- sch.setUpgradeState(UpgradeState.COMPLETE); // Component upgrade confirmed
- sch.setStackVersion(cluster.getDesiredStackVersion());
- } else { // Unexpected (wrong) version received
- // Even during failed upgrade, we should not receive wrong version
- // That's why mark as VERSION_MISMATCH
- sch.setUpgradeState(UpgradeState.VERSION_MISMATCH);
- }
- } else if (upgradeState == UpgradeState.VERSION_MISMATCH && desiredVersion.equals(newVersion)) {
- if (cluster.getUpgradeInProgress() != null) {
- sch.setUpgradeState(UpgradeState.COMPLETE);
- } else {
- sch.setUpgradeState(UpgradeState.NONE);
- }
- } else { // No upgrade in progress, unexpected version change
- sch.setUpgradeState(UpgradeState.VERSION_MISMATCH);
+ private void setUpgradeStateAndRecalculateHostVersions(Cluster cluster, ServiceComponentHost sch,
+ UpgradeState upgradeState) throws AmbariException {
+
+ // don't need to recalculate anything here if the upgrade state is not changing
+ if (sch.getUpgradeState() == upgradeState) {
+ return;
+ }
+
+ // if the upgrade state changes, then also recalculate host versions
+ sch.setUpgradeState(upgradeState);
+
+ recalculateHostVersionAndClusterVersion(cluster, sch);
+ }
+
+ /**
+ * Recalculates the {@link HostVersionEntity} for the host specified by the
+ * host component, taking into account all component states on that host. This
+ * will also trigger a {@link ClusterVersionEntity} recalculatation for the
+ * cluster version as well.
+ *
+ * @param cluster
+ * @param sch
+ * @throws AmbariException
+ */
+ private void recalculateHostVersionAndClusterVersion(Cluster cluster, ServiceComponentHost sch)
+ throws AmbariException {
+ // trigger a re-calculation of the cluster state based on the SCH state
+ RepositoryVersionEntity repoVersion = sch.recalculateHostVersionState();
+ if (null != repoVersion) {
+ cluster.recalculateClusterVersionState(repoVersion);
}
- sch.setVersion(newVersion);
}
-}
+}
\ No newline at end of file