You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2015/01/13 21:56:16 UTC

ambari git commit: AMBARI-9073. Rolling Upgrade - Finalize needs to complete upgrade states in DB (Yurii Shylov via ncole)

Repository: ambari
Updated Branches:
  refs/heads/trunk 95b466923 -> fb947b0cc


AMBARI-9073. Rolling Upgrade - Finalize needs to complete upgrade states in DB (Yurii Shylov via ncole)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/fb947b0c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/fb947b0c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/fb947b0c

Branch: refs/heads/trunk
Commit: fb947b0cce36c53d10ad94e9d1056f6f6a01f5f1
Parents: 95b4669
Author: Nate Cole <nc...@hortonworks.com>
Authored: Tue Jan 13 15:16:09 2015 -0500
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Tue Jan 13 15:56:01 2015 -0500

----------------------------------------------------------------------
 .../ambari/server/agent/HeartBeatHandler.java   | 54 ++++++++++++
 .../orm/entities/HostComponentStateEntity.java  | 13 +++
 .../upgrades/FinalizeUpgradeAction.java         | 16 +++-
 .../org/apache/ambari/server/state/Cluster.java | 11 ++-
 .../server/state/ServiceComponentHost.java      | 34 ++++++--
 .../server/state/cluster/ClusterImpl.java       |  8 +-
 .../svccomphost/ServiceComponentHostImpl.java   | 91 ++++++++++++++++++++
 .../server/upgrade/UpgradeCatalog200.java       |  3 +
 .../main/resources/Ambari-DDL-MySQL-CREATE.sql  |  1 +
 .../main/resources/Ambari-DDL-Oracle-CREATE.sql |  3 +-
 .../resources/Ambari-DDL-Postgres-CREATE.sql    |  1 +
 .../Ambari-DDL-Postgres-EMBEDDED-CREATE.sql     |  1 +
 .../resources/Ambari-DDL-SQLServer-CREATE.sql   |  2 +-
 .../server/upgrade/UpgradeCatalog200Test.java   | 13 +++
 14 files changed, 232 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index da4d38b..60be733 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.bootstrap.DistributeRepositoriesStructuredOutput;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.MaintenanceStateHelper;
 import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
@@ -49,6 +51,14 @@ import org.apache.ambari.server.events.AlertReceivedEvent;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.HostVersionDAO;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostVersionEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
 import org.apache.ambari.server.serveraction.kerberos.KerberosActionDataFile;
 import org.apache.ambari.server.serveraction.kerberos.KerberosActionDataFileReader;
 import org.apache.ambari.server.serveraction.kerberos.KerberosServerAction;
@@ -70,6 +80,8 @@ import org.apache.ambari.server.state.ServiceInfo;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.state.UpgradeHelper;
+import org.apache.ambari.server.state.UpgradeState;
 import org.apache.ambari.server.state.alert.AlertDefinition;
 import org.apache.ambari.server.state.alert.AlertDefinitionHash;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
@@ -88,10 +100,13 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.annotations.SerializedName;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Singleton;
@@ -394,6 +409,7 @@ public class HeartBeatHandler {
     }
     Collection<HostRoleCommand> commands = actionManager.getTasks(taskIds);
 
+    Set<ServiceComponentHost> scHostsRequireRecalculation = new HashSet<ServiceComponentHost>();
     Iterator<HostRoleCommand> hostRoleCommandIterator = commands.iterator();
     for (CommandReport report : reports) {
 
@@ -450,6 +466,25 @@ public class HeartBeatHandler {
           String schName = scHost.getServiceComponentName();
 
           if (report.getStatus().equals(HostRoleStatus.COMPLETED.toString())) {
+
+            // Reading component version if it is present
+            if (StringUtils.isNotBlank(report.getStructuredOut())) {
+              try {
+                final ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
+                final String previousVersion = scHost.getVersion();
+                if (StringUtils.isNotBlank(structuredOutput.getVersion()) && !previousVersion.equals(structuredOutput.getVersion())) {
+                  scHost.setVersion(structuredOutput.getVersion());
+                  if (!previousVersion.equals("UNKNOWN")) {
+                    scHost.setUpgradeState(UpgradeState.COMPLETE);
+                  }
+                  scHostsRequireRecalculation.add(scHost);
+                }
+              } catch (JsonSyntaxException ex) {
+                //Json structure for component version was incorrect
+                //do nothing, pass this data further for processing
+              }
+            }
+
             // Updating stack version, if needed
             if (scHost.getState().equals(State.UPGRADING)) {
               scHost.setStackVersion(scHost.getDesiredStackVersion());
@@ -518,6 +553,10 @@ public class HeartBeatHandler {
         }
       }
     }
+    //Recalculate host versions
+    for (ServiceComponentHost serviceComponentHost : scHostsRequireRecalculation) {
+      serviceComponentHost.recalculateHostVersionState();
+    }
     //Update state machines from reports
     actionManager.processTaskResponse(hostname, reports, commands);
   }
@@ -944,5 +983,20 @@ public class HeartBeatHandler {
     ec.setKerberosCommandParams(kcp);
   }
 
+  /**
+   * This class is used for mapping json of structured output for component START action.
+   */
+  private static class ComponentVersionStructuredOut {
+    @SerializedName("version")
+    private String version;
+
+    public String getVersion() {
+      return version;
+    }
+
+    public void setVersion(String version) {
+      this.version = version;
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentStateEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentStateEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentStateEntity.java
index 11f2c14..1e48950 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentStateEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentStateEntity.java
@@ -47,6 +47,9 @@ public class HostComponentStateEntity {
   @Column(name = "component_name", nullable = false, insertable = false, updatable = false)
   private String componentName;
 
+  @Column(name = "version", nullable = false, insertable = true, updatable = true)
+  private String version = "UNKNOWN";
+
   @Enumerated(value = EnumType.STRING)
   @Column(name = "current_state", nullable = false, insertable = true, updatable = true)
   private State currentState = State.INIT;
@@ -138,6 +141,14 @@ public class HostComponentStateEntity {
     this.currentStackVersion = currentStackVersion;
   }
 
+  public String getVersion() {
+    return version;
+  }
+
+  public void setVersion(String version) {
+    this.version = version;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
@@ -153,6 +164,7 @@ public class HostComponentStateEntity {
     if (upgradeState != null ? !upgradeState.equals(that.upgradeState) : that.upgradeState != null) return false;
     if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) return false;
     if (serviceName != null ? !serviceName.equals(that.serviceName) : that.serviceName != null) return false;
+    if (version != null ? !version.equals(that.version) : that.version != null) return false;
 
     return true;
   }
@@ -166,6 +178,7 @@ public class HostComponentStateEntity {
     result = 31 * result + (upgradeState != null ? upgradeState.hashCode() : 0);
     result = 31 * result + (currentStackVersion != null ? currentStackVersion.hashCode() : 0);
     result = 31 * result + (serviceName != null ? serviceName.hashCode() : 0);
+    result = 31 * result + (version != null ? version.hashCode() : 0);
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/FinalizeUpgradeAction.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/FinalizeUpgradeAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/FinalizeUpgradeAction.java
index 7894de8..56281a2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/FinalizeUpgradeAction.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/FinalizeUpgradeAction.java
@@ -18,18 +18,22 @@
 package org.apache.ambari.server.serveraction.upgrades;
 
 import com.google.inject.Inject;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.orm.dao.ClusterVersionDAO;
+import org.apache.ambari.server.orm.dao.HostComponentStateDAO;
 import org.apache.ambari.server.orm.dao.HostVersionDAO;
 import org.apache.ambari.server.orm.entities.ClusterVersionEntity;
+import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
 import org.apache.ambari.server.orm.entities.HostVersionEntity;
 import org.apache.ambari.server.serveraction.AbstractServerAction;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.RepositoryVersionState;
+import org.apache.ambari.server.state.UpgradeState;
 import org.apache.commons.lang.StringUtils;
 
 import java.text.MessageFormat;
@@ -57,15 +61,15 @@ public class FinalizeUpgradeAction extends AbstractServerAction {
   @Inject
   private HostVersionDAO hostVersionDAO;
 
+  @Inject
+  private HostComponentStateDAO hostComponentStateDAO;
+
   @Override
   public CommandReport execute(
       ConcurrentMap<String, Object> requestSharedDataContext)
       throws AmbariException, InterruptedException {
 
     Set<RepositoryVersionState> allowedStates = new HashSet<RepositoryVersionState>();
-    // TODO Rolling Upgrade, hack, should only allow UPGRADED.
-    allowedStates.add(RepositoryVersionState.INSTALLED);
-    allowedStates.add(RepositoryVersionState.UPGRADING);
     allowedStates.add(RepositoryVersionState.UPGRADED);
 
     StringBuffer outSB = new StringBuffer();
@@ -116,6 +120,12 @@ public class FinalizeUpgradeAction extends AbstractServerAction {
             StringUtils.join(hostsWithoutCorrectVersionState, ", ")));
       }
 
+      outSB.append(String.format("Will finalize the upgraded state of all host components.\n"));
+      for (HostComponentStateEntity hostComponentStateEntity: hostComponentStateDAO.findAll()) {
+        hostComponentStateEntity.setUpgradeState(UpgradeState.NONE);
+        hostComponentStateDAO.merge(hostComponentStateEntity);
+      }
+
       outSB.append(String.format("Will finalize the version for %d host(s).\n", hosts.keySet().size()));
       cluster.mapHostVersions(hosts.keySet(), upgradingClusterVersion, RepositoryVersionState.CURRENT);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index 15411fc..fd0188c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -90,7 +90,7 @@ public interface Cluster {
   /**
    * Remove ServiceComponentHost from cluster
    * @param svcCompHost
-   */  
+   */
   public void removeServiceComponentHost(ServiceComponentHost svcCompHost) throws AmbariException;
 
 
@@ -155,6 +155,9 @@ public interface Cluster {
    * May be called multiple times.
    * As of now, only transition from INSTALLING to INSTALLING/INSTALLED/INSTALL_FAILED/OUT_OF_SYNC
    * is supported
+   *
+   * @param repositoryVersion repository version (e.g. 2.2.1.0-100)
+   *
    * @throws AmbariException
    */
   void recalculateClusterVersionState(String repositoryVersion) throws AmbariException;
@@ -193,15 +196,15 @@ public interface Cluster {
   /**
    * Gets whether the cluster is still initializing or has finished with its
    * deployment requests.
-   * 
+   *
    * @return either {@link State#INIT} or {@link State#INSTALLED}, never
    *         {@code null}.
    */
   public State getProvisioningState();
-  
+
   /**
    * Sets the provisioning state of the cluster.
-   * 
+   *
    * @param provisioningState
    *          the provisioning state, not {@code null}.
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java
index 74e2371..e337153 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java
@@ -87,7 +87,6 @@ public interface ServiceComponentHost {
    */
   public SecurityState getSecurityState();
 
-
   /**
    * Sets the current security state for this ServiceComponent
    * <p/>
@@ -98,6 +97,20 @@ public interface ServiceComponentHost {
   public void setSecurityState(SecurityState state);
 
   /**
+   * Gets the version of the component.
+   *
+   * @return component version
+   */
+  public String getVersion();
+
+  /**
+   * Sets the version of the component from the stack.
+   *
+   * @param version component version (e.g. 2.2.0.0-2041)
+   */
+  public void setVersion(String version);
+
+  /**
    * Gets the desired security state for this ServiceComponent
    * <p/>
    * The returned SecurityState is a valid endpoint state where
@@ -119,7 +132,7 @@ public interface ServiceComponentHost {
   public void setDesiredSecurityState(SecurityState securityState) throws AmbariException;
 
   public void setUpgradeState(UpgradeState upgradeState);
-  
+
   public StackId getStackVersion();
 
   public void setStackVersion(StackId stackVersion);
@@ -148,7 +161,7 @@ public interface ServiceComponentHost {
    * @param configTags
    */
   public void updateActualConfigs(Map<String, Map<String, String>> configTags);
-  
+
   /**
    * Gets the actual config tags, if known.
    * @return the actual config map
@@ -161,7 +174,7 @@ public interface ServiceComponentHost {
    * @param state the maintenance state
    */
   public void setMaintenanceState(MaintenanceState state);
-  
+
   /**
    * @return the maintenance state
    */
@@ -171,9 +184,9 @@ public interface ServiceComponentHost {
    * @param procs a list containing a map describing each process
    */
   public void setProcesses(List<Map<String, String>> procs);
-  
-  
-  /**  
+
+
+  /**
    * @return the list of maps describing each process
    */
   public List<Map<String, String>> getProcesses();
@@ -188,4 +201,11 @@ public interface ServiceComponentHost {
    */
   public void setRestartRequired(boolean restartRequired);
 
+  /**
+   * Changes host version state according to state of the components installed on the host.
+   *
+   * @throws AmbariException if host is detached from the cluster
+   */
+  public void recalculateHostVersionState() throws AmbariException;
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 0b9525a..19a5f9f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -1271,12 +1271,13 @@ public class ClusterImpl implements Cluster {
         if (clusterVersion.getState() != RepositoryVersionState.INSTALL_FAILED &&
                 clusterVersion.getState() != RepositoryVersionState.OUT_OF_SYNC &&
                 clusterVersion.getState() != RepositoryVersionState.INSTALLING &&
-                clusterVersion.getState() != RepositoryVersionState.INSTALLED) {
+                clusterVersion.getState() != RepositoryVersionState.INSTALLED &&
+                clusterVersion.getState() != RepositoryVersionState.UPGRADING &&
+                clusterVersion.getState() != RepositoryVersionState.UPGRADED) {
           // anything else is not supported as of now
           return;
         }
-        // Process transition from INSTALLING state
-        worstState = RepositoryVersionState.INSTALLED;
+        worstState = RepositoryVersionState.UPGRADED;
         for (Host host : hosts.values()) {
           String hostName = host.getHostName();
           if (host.getState() != HostState.HEALTHY) {
@@ -1418,6 +1419,7 @@ public class ClusterImpl implements Cluster {
             case INSTALLED:
               allowedStates.add(RepositoryVersionState.INSTALLING);
               allowedStates.add(RepositoryVersionState.UPGRADING);
+              allowedStates.add(RepositoryVersionState.UPGRADED);
               allowedStates.add(RepositoryVersionState.OUT_OF_SYNC);
               break;
             case OUT_OF_SYNC:

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index 53c8cff..3540477 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -19,11 +19,14 @@
 package org.apache.ambari.server.state.svccomphost;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -39,12 +42,16 @@ import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
 import org.apache.ambari.server.orm.dao.HostComponentStateDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.HostVersionDAO;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
 import org.apache.ambari.server.orm.dao.ServiceComponentDesiredStateDAO;
 import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity;
 import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntityPK;
 import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
 import org.apache.ambari.server.orm.entities.HostComponentStateEntityPK;
 import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostVersionEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
 import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity;
 import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntityPK;
 import org.apache.ambari.server.state.Cluster;
@@ -55,7 +62,9 @@ import org.apache.ambari.server.state.HostComponentAdminState;
 import org.apache.ambari.server.state.HostConfig;
 import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.SecurityState;
+import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
@@ -69,6 +78,7 @@ import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.fsm.SingleArcTransition;
 import org.apache.ambari.server.state.fsm.StateMachine;
 import org.apache.ambari.server.state.fsm.StateMachineFactory;
+import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -104,6 +114,10 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
   @Inject
   HostDAO hostDAO;
   @Inject
+  HostVersionDAO hostVersionDAO;
+  @Inject
+  RepositoryVersionDAO repositoryVersionDAO;
+  @Inject
   ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO;
   @Inject
   Clusters clusters;
@@ -685,6 +699,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
     stateEntity.setClusterId(serviceComponent.getClusterId());
     stateEntity.setComponentName(serviceComponent.getName());
     stateEntity.setServiceName(serviceComponent.getServiceName());
+    stateEntity.setVersion("UNKNOWN");
     stateEntity.setHostName(hostName);
     stateEntity.setCurrentState(stateMachine.getCurrentState());
     stateEntity.setUpgradeState(UpgradeState.NONE);
@@ -780,6 +795,37 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
   }
 
   @Override
+  public String getVersion() {
+    clusterGlobalLock.readLock().lock();
+    try {
+      readLock.lock();
+      try {
+        return stateEntity.getVersion();
+      } finally {
+        readLock.unlock();
+      }
+    } finally {
+      clusterGlobalLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void setVersion(String version) {
+    clusterGlobalLock.readLock().lock();
+    try {
+      writeLock.lock();
+      try {
+        stateEntity.setVersion(version);
+        saveIfPersisted();
+      } finally {
+        writeLock.unlock();
+      }
+    } finally {
+      clusterGlobalLock.readLock().unlock();
+    }
+  }
+
+  @Override
   public SecurityState getSecurityState() {
     clusterGlobalLock.readLock().lock();
     try {
@@ -1658,4 +1704,49 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
       clusterGlobalLock.readLock().unlock();
     }
   }
+
+  @Override
+  public void recalculateHostVersionState() throws AmbariException {
+    final String version = getVersion();
+    final String hostName = getHostName();
+    final HostEntity host = hostDAO.findByName(hostName);
+    final Set<Cluster> clustersForHost = clusters.getClustersForHost(hostName);
+    if (clustersForHost.size() != 1) {
+      throw new AmbariException("Host " + hostName + " should be assigned only to one cluster");
+    }
+    final Cluster cluster = clustersForHost.iterator().next();
+    final StackId stack = cluster.getDesiredStackVersion();
+    final RepositoryVersionEntity repositoryVersion = repositoryVersionDAO.findByStackAndVersion(stack.getStackId(), version);
+    final HostVersionEntity hostVersionEntity = hostVersionDAO.findByClusterStackVersionAndHost(cluster.getClusterName(), repositoryVersion.getStack(), repositoryVersion.getVersion(), hostName);
+
+    final Collection<HostComponentStateEntity> allHostComponents = host.getHostComponentStateEntities();
+    final Collection<HostComponentStateEntity> upgradedHostComponents = new HashSet<HostComponentStateEntity>();
+    for (HostComponentStateEntity hostComponentStateEntity: allHostComponents) {
+      if (hostComponentStateEntity.getUpgradeState().equals(UpgradeState.COMPLETE) && !hostComponentStateEntity.getVersion().equals("UNKNOWN")) {
+        upgradedHostComponents.add(hostComponentStateEntity);
+      }
+    }
+
+    //TODO hack: clients' states are not updated, probably we should check the state of master components
+    final Collection<HostComponentStateEntity> nonUpgradedHostComponents = CollectionUtils.subtract(allHostComponents, upgradedHostComponents);
+    for (HostComponentStateEntity hostComponentStateEntity: nonUpgradedHostComponents) {
+      final Service service = cluster.getService(hostComponentStateEntity.getServiceName());
+      if (service.getServiceComponent(hostComponentStateEntity.getComponentName()).isClientComponent()) {
+        upgradedHostComponents.add(hostComponentStateEntity);
+      }
+    }
+
+    if (allHostComponents.size() == upgradedHostComponents.size() &&
+        (hostVersionEntity.getState().equals(RepositoryVersionState.INSTALLED) || hostVersionEntity.getState().equals(RepositoryVersionState.UPGRADING))) {
+      hostVersionEntity.setState(RepositoryVersionState.UPGRADED);
+      hostVersionDAO.merge(hostVersionEntity);
+    }
+
+    if (!upgradedHostComponents.isEmpty() && upgradedHostComponents.size() < allHostComponents.size()) {
+      hostVersionEntity.setState(RepositoryVersionState.UPGRADING);
+      hostVersionDAO.merge(hostVersionEntity);
+    }
+
+    cluster.recalculateClusterVersionState(version);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog200.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog200.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog200.java
index e86ca1a..dae920e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog200.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog200.java
@@ -182,6 +182,9 @@ public class UpgradeCatalog200 extends AbstractUpgradeCatalog {
     dbAccessor.addColumn("hostcomponentstate", new DBAccessor.DBColumnInfo("upgrade_state",
         String.class, 32, "NONE", false));
 
+    dbAccessor.addColumn("hostcomponentstate", new DBAccessor.DBColumnInfo("version",
+        String.class, 32, "UNKNOWN", false));
+
     dbAccessor.addColumn("host_role_command", new DBAccessor.DBColumnInfo("retry_allowed",
         Integer.class, 1, 0, false));
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
index 9047b15..4e61f36 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
@@ -106,6 +106,7 @@ CREATE TABLE hostcomponentdesiredstate (
 CREATE TABLE hostcomponentstate (
   cluster_id BIGINT NOT NULL,
   component_name VARCHAR(255) NOT NULL,
+  version VARCHAR(32) NOT NULL DEFAULT 'UNKNOWN',
   current_stack_version VARCHAR(255) NOT NULL,
   current_state VARCHAR(255) NOT NULL,
   host_name VARCHAR(255) NOT NULL,

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
index 3714060..59031f7 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
@@ -97,11 +97,12 @@ CREATE TABLE hostcomponentdesiredstate (
 CREATE TABLE hostcomponentstate (
   cluster_id NUMBER(19) NOT NULL,
   component_name VARCHAR2(255) NOT NULL,
+  version VARCHAR2(32) DEFAULT 'UNKNOWN' NOT NULL,
   current_stack_version VARCHAR2(255) NOT NULL,
   current_state VARCHAR2(255) NOT NULL,
   host_name VARCHAR2(255) NOT NULL,
   service_name VARCHAR2(255) NOT NULL,
-  upgrade_state VARCHAR2(255) DEFAULT 'NONE' NOT NULL,
+  upgrade_state VARCHAR2(32) DEFAULT 'NONE' NOT NULL,
   security_state VARCHAR2(32) DEFAULT 'UNSECURED' NOT NULL,
   PRIMARY KEY (cluster_id, component_name, host_name, service_name));
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
index 0e7b9c6..7b64212 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
@@ -105,6 +105,7 @@ CREATE TABLE hostcomponentdesiredstate (
 CREATE TABLE hostcomponentstate (
   cluster_id BIGINT NOT NULL,
   component_name VARCHAR(255) NOT NULL,
+  version VARCHAR(32) NOT NULL DEFAULT 'UNKNOWN',
   current_stack_version VARCHAR(255) NOT NULL,
   current_state VARCHAR(255) NOT NULL,
   host_name VARCHAR(255) NOT NULL,

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql
index 0159a11..bfb49e5 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql
@@ -127,6 +127,7 @@ GRANT ALL PRIVILEGES ON TABLE ambari.hostcomponentdesiredstate TO :username;
 CREATE TABLE ambari.hostcomponentstate (
   cluster_id BIGINT NOT NULL,
   component_name VARCHAR(255) NOT NULL,
+  version VARCHAR(32) NOT NULL DEFAULT 'UNKNOWN',
   current_stack_version VARCHAR(255) NOT NULL,
   current_state VARCHAR(255) NOT NULL,
   host_name VARCHAR(255) NOT NULL,

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
index 50085f2..e6f2edb 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
@@ -41,7 +41,7 @@ CREATE TABLE clusterservices (service_name VARCHAR(255) NOT NULL, cluster_id BIG
 CREATE TABLE clusterstate (cluster_id BIGINT NOT NULL, current_cluster_state VARCHAR(255) NOT NULL, current_stack_version VARCHAR(255) NOT NULL, PRIMARY KEY CLUSTERED (cluster_id));
 CREATE TABLE cluster_version (id BIGINT NOT NULL, cluster_id BIGINT NOT NULL, repo_version_id BIGINT NOT NULL, state VARCHAR(255) NOT NULL, start_time BIGINT NOT NULL, end_time BIGINT, user_name VARCHAR(255), PRIMARY KEY (id));
 CREATE TABLE hostcomponentdesiredstate (cluster_id BIGINT NOT NULL, component_name VARCHAR(255) NOT NULL, desired_stack_version VARCHAR(255) NOT NULL, desired_state VARCHAR(255) NOT NULL, host_name VARCHAR(255) NOT NULL, service_name VARCHAR(255) NOT NULL, admin_state VARCHAR(32), maintenance_state VARCHAR(32) NOT NULL, security_state VARCHAR(32) NOT NULL DEFAULT 'UNSECURED', restart_required BIT NOT NULL DEFAULT 0, PRIMARY KEY CLUSTERED (cluster_id, component_name, host_name, service_name));
-CREATE TABLE hostcomponentstate (cluster_id BIGINT NOT NULL, component_name VARCHAR(255) NOT NULL, current_stack_version VARCHAR(255) NOT NULL, current_state VARCHAR(255) NOT NULL, host_name VARCHAR(255) NOT NULL, service_name VARCHAR(255) NOT NULL, upgrade_state VARCHAR(32) NOT NULL DEFAULT 'NONE', security_state VARCHAR(32) NOT NULL DEFAULT 'UNSECURED', PRIMARY KEY CLUSTERED (cluster_id, component_name, host_name, service_name));
+CREATE TABLE hostcomponentstate (cluster_id BIGINT NOT NULL, component_name VARCHAR(255) NOT NULL, version VARCHAR(32) NOT NULL DEFAULT 'UNKNOWN', current_stack_version VARCHAR(255) NOT NULL, current_state VARCHAR(255) NOT NULL, host_name VARCHAR(255) NOT NULL, service_name VARCHAR(255) NOT NULL, upgrade_state VARCHAR(32) NOT NULL DEFAULT 'NONE', security_state VARCHAR(32) NOT NULL DEFAULT 'UNSECURED', PRIMARY KEY CLUSTERED (cluster_id, component_name, host_name, service_name));
 CREATE TABLE hosts (host_name VARCHAR(255) NOT NULL, cpu_count INTEGER NOT NULL, ph_cpu_count INTEGER, cpu_info VARCHAR(255) NOT NULL, discovery_status VARCHAR(2000) NOT NULL, host_attributes VARCHAR(MAX) NOT NULL, ipv4 VARCHAR(255), ipv6 VARCHAR(255), public_host_name VARCHAR(255), last_registration_time BIGINT NOT NULL, os_arch VARCHAR(255) NOT NULL, os_info VARCHAR(1000) NOT NULL, os_type VARCHAR(255) NOT NULL, rack_info VARCHAR(255) NOT NULL, total_mem BIGINT NOT NULL, PRIMARY KEY CLUSTERED (host_name));
 CREATE TABLE hoststate (agent_version VARCHAR(255) NOT NULL, available_mem BIGINT NOT NULL, current_state VARCHAR(255) NOT NULL, health_status VARCHAR(255), host_name VARCHAR(255) NOT NULL, time_in_state BIGINT NOT NULL, maintenance_state VARCHAR(512), PRIMARY KEY CLUSTERED (host_name));
 CREATE TABLE servicecomponentdesiredstate (component_name VARCHAR(255) NOT NULL, cluster_id BIGINT NOT NULL, desired_stack_version VARCHAR(255) NOT NULL, desired_state VARCHAR(255) NOT NULL, service_name VARCHAR(255) NOT NULL, PRIMARY KEY CLUSTERED (component_name, cluster_id, service_name));

http://git-wip-us.apache.org/repos/asf/ambari/blob/fb947b0c/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog200Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog200Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog200Test.java
index b89b31c..73531ec 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog200Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog200Test.java
@@ -113,6 +113,7 @@ public class UpgradeCatalog200Test {
     Capture<DBAccessor.DBColumnInfo> alertDefinitionDescriptionColumnCapture = new Capture<DBAccessor.DBColumnInfo>();
     Capture<DBAccessor.DBColumnInfo> alertTargetGlobalColumnCapture = new Capture<DBAccessor.DBColumnInfo>();
     Capture<DBAccessor.DBColumnInfo> hostComponentStateColumnCapture = new Capture<DBAccessor.DBColumnInfo>();
+    Capture<DBAccessor.DBColumnInfo> hostComponentVersionColumnCapture = new Capture<DBAccessor.DBColumnInfo>();
     Capture<DBAccessor.DBColumnInfo> hostComponentStateSecurityStateColumnCapture = new Capture<DBAccessor.DBColumnInfo>();
     Capture<DBAccessor.DBColumnInfo> hostComponentDesiredStateSecurityStateColumnCapture = new Capture<DBAccessor.DBColumnInfo>();
     Capture<DBAccessor.DBColumnInfo> hostRoleCommandRetryColumnCapture = new Capture<DBAccessor.DBColumnInfo>();
@@ -151,6 +152,10 @@ public class UpgradeCatalog200Test {
     dbAccessor.addColumn(eq("hostcomponentstate"),
         capture(hostComponentStateColumnCapture));
 
+    // Host Component Version
+    dbAccessor.addColumn(eq("hostcomponentstate"),
+        capture(hostComponentVersionColumnCapture));
+
     // Host Role Command retry allowed
     dbAccessor.addColumn(eq("host_role_command"),
         capture(hostRoleCommandRetryColumnCapture));
@@ -225,6 +230,14 @@ public class UpgradeCatalog200Test {
     assertEquals("NONE", upgradeStateColumn.getDefaultValue());
     assertFalse(upgradeStateColumn.isNullable());
 
+    // Verify added column in hostcomponentstate table
+    DBAccessor.DBColumnInfo upgradeVersionColumn = hostComponentVersionColumnCapture.getValue();
+    assertEquals("version", upgradeVersionColumn.getName());
+    assertEquals(32, (int) upgradeVersionColumn.getLength());
+    assertEquals(String.class, upgradeVersionColumn.getType());
+    assertEquals("UNKNOWN", upgradeVersionColumn.getDefaultValue());
+    assertFalse(upgradeVersionColumn.isNullable());
+
     // Verify added column in host_role_command table
     DBAccessor.DBColumnInfo upgradeRetryColumn = hostRoleCommandRetryColumnCapture.getValue();
     assertEquals("retry_allowed", upgradeRetryColumn.getName());