You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/06/01 01:44:03 UTC

[ambari] branch trunk updated: AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. (#1405)

This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 607fe42  AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. (#1405)
607fe42 is described below

commit 607fe42f121e9453de3938fcff3f850c89866e0a
Author: avijayanhwx <av...@hortonworks.com>
AuthorDate: Thu May 31 18:43:44 2018 -0700

    AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. (#1405)
    
    * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU.
    
    * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU. (Unit tests)
    
    * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU.
    
    * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU.
    
    * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU.
    
    * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU.
    
    * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU.
    
    * AMBARI-23973 : Pre Upgrade check for AMS hadoop sink in HDP 2.6 to 3.0 EU.
---
 ...MetricsHadoopSinkVersionCompatibilityCheck.java | 252 ++++++++++++++++++
 .../ambari/server/checks/CheckDescription.java     |   8 +
 .../ambari/server/orm/dao/HostRoleCommandDAO.java  |  19 +-
 .../server/orm/entities/HostRoleCommandEntity.java |   5 +-
 .../AMBARI_METRICS/0.1.0/configuration/ams-env.xml |   8 +
 .../AMBARI_METRICS/0.1.0/metainfo.xml              |  10 +
 .../0.1.0/package/scripts/metrics_monitor.py       |   6 +
 .../AMBARI_METRICS/0.1.0/package/scripts/params.py |   1 +
 .../AmbariMetricsHadoopSinkVersionCheckTest.java   | 283 +++++++++++++++++++++
 9 files changed, 587 insertions(+), 5 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCompatibilityCheck.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCompatibilityCheck.java
new file mode 100644
index 0000000..8b0f631
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCompatibilityCheck.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.checks;
+
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.PrereqCheckRequest;
+import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
+import org.apache.ambari.server.controller.internal.RequestResourceProvider;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.state.stack.PrereqCheckStatus;
+import org.apache.ambari.server.state.stack.PrerequisiteCheck;
+import org.apache.ambari.server.state.stack.UpgradePack;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * This pre upgrade check verifies if the Ambari Metrics Hadoop Sink package version on all hosts is the expected one
+ * corresponding to the stack version. For example, in HDP 3.0, the corresponding ambari-metrics-hadoop-sink version should
+ * be 2.7.0.0.
+ */
+@Singleton
+@UpgradeCheck(
+  group = UpgradeCheckGroup.REPOSITORY_VERSION)
+public class AmbariMetricsHadoopSinkVersionCompatibilityCheck extends AbstractCheckDescriptor  {
+
+  @Inject
+  private RequestDAO requestDAO;
+
+  @Inject
+  private HostRoleCommandDAO hostRoleCommandDAO;
+
+  private static final Logger LOG = LoggerFactory.getLogger(AmbariMetricsHadoopSinkVersionCompatibilityCheck.class);
+
+  private enum PreUpgradeCheckStatus {SUCCESS, FAILED, RUNNING}
+
+  static final String HADOOP_SINK_VERSION_NOT_SPECIFIED = "hadoop-sink-version-not-specified";
+
+  static final String MIN_HADOOP_SINK_VERSION_PROPERTY_NAME = "min-hadoop-sink-version";
+  static final String RETRY_INTERVAL_PROPERTY_NAME = "request-status-check-retry-interval";
+  static final String NUM_TRIES_PROPERTY_NAME = "request-status-check-num-retries";
+
+  /**
+   * Total wait time for Ambari Server request time to finish => 2 mins.
+   */
+  private long retryInterval = 6000l; // 6 seconds sleep interval per retry.
+  private int numTries = 20; // 20 times the check will try to see if request finished.
+
+  /**
+   * Constructor.
+   */
+  public AmbariMetricsHadoopSinkVersionCompatibilityCheck() {
+    super(CheckDescription.AMS_HADOOP_SINK_VERSION_COMPATIBILITY);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Set<String> getApplicableServices() {
+    return Sets.newHashSet("AMBARI_METRICS", "HDFS");
+  }
+
+  @Override
+  public void perform(PrerequisiteCheck prerequisiteCheck, PrereqCheckRequest prereqCheckRequest) throws AmbariException {
+
+    String minHadoopSinkVersion = null;
+
+    UpgradePack.PrerequisiteCheckConfig prerequisiteCheckConfig = prereqCheckRequest.getPrerequisiteCheckConfig();
+    Map<String, String> checkProperties = null;
+    if(prerequisiteCheckConfig != null) {
+      checkProperties = prerequisiteCheckConfig.getCheckProperties(this.getClass().getName());
+    }
+
+    if(checkProperties != null) {
+      minHadoopSinkVersion = checkProperties.get(MIN_HADOOP_SINK_VERSION_PROPERTY_NAME);
+      retryInterval = Long.valueOf(checkProperties.getOrDefault(RETRY_INTERVAL_PROPERTY_NAME, "6000"));
+      numTries = Integer.valueOf(checkProperties.getOrDefault(NUM_TRIES_PROPERTY_NAME, "20"));
+    }
+
+    if (StringUtils.isEmpty(minHadoopSinkVersion)) {
+      LOG.debug("Hadoop Sink version for pre-check not specified.");
+      prerequisiteCheck.setStatus(PrereqCheckStatus.FAIL);
+      prerequisiteCheck.setFailReason(getFailReason(HADOOP_SINK_VERSION_NOT_SPECIFIED, prerequisiteCheck, prereqCheckRequest));
+      return;
+    }
+
+    LOG.debug("Properties : Hadoop Sink Version = {} , retryInterval = {}, numTries = {}", minHadoopSinkVersion, retryInterval, numTries);
+
+    AmbariManagementController ambariManagementController = AmbariServer.getController();
+
+    ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
+      Resource.Type.Request,
+      ambariManagementController
+      );
+
+    String clusterName = prereqCheckRequest.getClusterName();
+
+    Set<String> hosts = ambariManagementController.getClusters()
+      .getCluster(clusterName).getHosts("AMBARI_METRICS", "METRICS_MONITOR");
+
+    if (CollectionUtils.isEmpty(hosts)) {
+      LOG.warn("No hosts have the component METRICS_MONITOR.");
+      prerequisiteCheck.setStatus(PrereqCheckStatus.PASS);
+      return;
+    }
+
+    Set<Map<String, Object>> propertiesSet = new HashSet<>();
+    Map<String, Object> properties = new LinkedHashMap<>();
+    properties.put(RequestResourceProvider.REQUEST_CLUSTER_NAME_PROPERTY_ID, clusterName);
+
+    Set<Map<String, Object>> filterSet = new HashSet<>();
+    Map<String, Object> filterMap = new HashMap<>();
+    filterMap.put(RequestResourceProvider.SERVICE_ID, "AMBARI_METRICS");
+    filterMap.put(RequestResourceProvider.COMPONENT_ID, "METRICS_MONITOR");
+    filterMap.put(RequestResourceProvider.HOSTS_ID, StringUtils.join(hosts,","));
+    filterSet.add(filterMap);
+
+    properties.put(RequestResourceProvider.REQUEST_RESOURCE_FILTER_ID, filterSet);
+    propertiesSet.add(properties);
+
+    Map<String, String> requestInfoProperties = new HashMap<>();
+    requestInfoProperties.put(RequestResourceProvider.COMMAND_ID, "CHECK_HADOOP_SINK_VERSION");
+    requestInfoProperties.put(RequestResourceProvider.REQUEST_CONTEXT_ID, "Pre Upgrade check for compatible Hadoop Metric " +
+      "Sink version on all hosts.");
+
+    Request request = PropertyHelper.getCreateRequest(propertiesSet, requestInfoProperties);
+    try {
+      org.apache.ambari.server.controller.spi.RequestStatus response = provider.createResources(request);
+      Resource responseResource = response.getRequestResource();
+      String requestIdProp = PropertyHelper.getPropertyId("Requests", "id");
+      long requestId = (long) responseResource.getPropertyValue(requestIdProp);
+      LOG.debug("RequestId for AMS Hadoop Sink version compatibility pre check : " + requestId);
+
+      Thread.sleep(retryInterval);
+      PreUpgradeCheckStatus status;
+      int retry = 0;
+      LinkedHashSet<String> failedHosts = new LinkedHashSet<>();
+      while ((status = pollRequestStatus(requestId, failedHosts)).equals(PreUpgradeCheckStatus.RUNNING)
+        && retry++ < numTries) {
+        if (retry != numTries) {
+          Thread.sleep(retryInterval);
+        }
+      }
+
+      if (status.equals(PreUpgradeCheckStatus.SUCCESS)) {
+        prerequisiteCheck.setStatus(PrereqCheckStatus.PASS);
+      } else {
+        prerequisiteCheck.setStatus(PrereqCheckStatus.FAIL);
+        prerequisiteCheck.setFailReason(String.format(getFailReason(prerequisiteCheck, prereqCheckRequest), minHadoopSinkVersion));
+        prerequisiteCheck.setFailedOn(failedHosts);
+      }
+    } catch (Exception e) {
+      LOG.error("Error running Pre Upgrade check for AMS Hadoop Sink compatibility. " + e);
+      prerequisiteCheck.setStatus(PrereqCheckStatus.FAIL);
+    }
+  }
+
+  /**
+   * Get the status of the requestId and also the set of failed hosts if any.
+   * @param requestId RequestId to track.
+   * @param failedHosts populate this argument for failed hosts.
+   * @return Status of the request.
+   * @throws Exception
+   */
+  private PreUpgradeCheckStatus pollRequestStatus(long requestId, Set<String> failedHosts) throws Exception {
+
+    List<RequestEntity> requestEntities = requestDAO.findByPks(Collections.singleton(requestId), true);
+    if (requestEntities != null && requestEntities.size() > 0) {
+
+      RequestEntity requestEntity = requestEntities.iterator().next();
+      HostRoleStatus requestStatus = requestEntity.getStatus();
+
+      if (HostRoleStatus.COMPLETED.equals(requestStatus)) {
+        return PreUpgradeCheckStatus.SUCCESS;
+      }
+
+      else if (requestStatus.isFailedState()) {
+        failedHosts.addAll(getPreUpgradeCheckFailedHosts(requestEntity));
+        LOG.debug("Hadoop Sink version check failed on the following hosts : " + failedHosts.stream().collect(Collectors.joining(",")));
+        return PreUpgradeCheckStatus.FAILED;
+      } else {
+        return PreUpgradeCheckStatus.RUNNING;
+      }
+    } else {
+      LOG.error("Unable to find RequestEntity for created request.");
+    }
+    return PreUpgradeCheckStatus.FAILED;
+  }
+
+  /**
+   * Get the set of hosts (tasks) that failed the check.
+   * @param requestEntity request info to get the task level info.
+   * @return Set of hosts which failed the check along with type of failed state.
+   * @throws Exception
+   */
+  private Set<String> getPreUpgradeCheckFailedHosts(RequestEntity requestEntity) throws Exception {
+
+    List<HostRoleCommandEntity> hostRoleCommandEntities = hostRoleCommandDAO.findByRequest(requestEntity.getRequestId(), true);
+
+    Set<String> failedHosts = new LinkedHashSet<>();
+    for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandEntities) {
+      HostRoleStatus status = hostRoleCommandEntity.getStatus();
+      if (status.isFailedState()) {
+        failedHosts.add(hostRoleCommandEntity.getHostName() + "(" + status + ")");
+      }
+    }
+    return failedHosts;
+  }
+}
\ No newline at end of file
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
index 76b8e23..f19b957 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
@@ -219,6 +219,14 @@ public class CheckDescription {
       .put(HiveDynamicServiceDiscoveryCheck.HIVE_DYNAMIC_SERVICE_ZK_NAMESPACE_KEY,
           "The hive-site.xml property hive.server2.zookeeper.namespace should be set to the value for the root namespace on ZooKeeper.").build());
 
+  public static CheckDescription AMS_HADOOP_SINK_VERSION_COMPATIBILITY = new CheckDescription("AMS_HADOOP_SINK_VERSION_COMPATIBILITY",
+    PrereqCheckType.HOST,
+    "Ambari Metrics Hadoop Sinks need to be compatible with the stack version. This check ensures that compatibility.",
+    new ImmutableMap.Builder<String, String>().put(AbstractCheckDescriptor.DEFAULT,"Hadoop Sink version check failed. " +
+      "To fix this, please upgrade 'ambari-metrics-hadoop-sink' package to %s on all the failed hosts")
+      .put(AmbariMetricsHadoopSinkVersionCompatibilityCheck.HADOOP_SINK_VERSION_NOT_SPECIFIED, "Hadoop Sink version for pre-check not specified. " +
+        "Please use 'min-hadoop-sink-version' property in upgrade pack to specify min hadoop sink version").build());
+
   public static CheckDescription CONFIG_MERGE = new CheckDescription("CONFIG_MERGE",
     PrereqCheckType.CLUSTER,
     "Configuration Merge Check",
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
index 010ccec..b735257 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
@@ -62,6 +62,8 @@ import org.apache.ambari.server.orm.entities.HostEntity;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity_;
 import org.apache.ambari.server.orm.entities.StageEntity;
+import org.eclipse.persistence.config.HintValues;
+import org.eclipse.persistence.config.QueryHints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -492,10 +494,19 @@ public class HostRoleCommandDAO {
 
   @RequiresSession
   public List<HostRoleCommandEntity> findByRequest(long requestId) {
-    TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT command " +
-      "FROM HostRoleCommandEntity command " +
-      "WHERE command.requestId=?1 ORDER BY command.taskId", HostRoleCommandEntity.class);
-    return daoUtils.selectList(query, requestId);
+    return findByRequest(requestId, false);
+  }
+
+  @RequiresSession
+  public List<HostRoleCommandEntity> findByRequest(long requestId, boolean refreshHint) {
+    TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery(
+      "HostRoleCommandEntity.findByRequestId",
+      HostRoleCommandEntity.class);
+    if (refreshHint) {
+      query.setHint(QueryHints.REFRESH, HintValues.TRUE);
+    }
+    query.setParameter("requestId", requestId);
+    return daoUtils.selectList(query);
   }
 
   @RequiresSession
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
index 0eea7e6..0b99667 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
@@ -107,7 +107,10 @@ import org.apache.commons.lang.ArrayUtils;
         query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND command.requestId < :upperRequestIdExclusive AND command.status IN :statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND host.hostName IS NOT NULL"),
     @NamedQuery(
         name = "HostRoleCommandEntity.findLatestServiceChecksByRole",
-        query = "SELECT NEW org.apache.ambari.server.orm.dao.HostRoleCommandDAO.LastServiceCheckDTO(command.role, MAX(command.endTime)) FROM HostRoleCommandEntity command WHERE command.roleCommand = :roleCommand AND command.endTime > 0 AND command.stage.clusterId = :clusterId GROUP BY command.role ORDER BY command.role ASC")
+        query = "SELECT NEW org.apache.ambari.server.orm.dao.HostRoleCommandDAO.LastServiceCheckDTO(command.role, MAX(command.endTime)) FROM HostRoleCommandEntity command WHERE command.roleCommand = :roleCommand AND command.endTime > 0 AND command.stage.clusterId = :clusterId GROUP BY command.role ORDER BY command.role ASC"),
+    @NamedQuery(
+      name = "HostRoleCommandEntity.findByRequestId",
+      query = "SELECT command FROM HostRoleCommandEntity command WHERE command.requestId = :requestId ORDER BY command.taskId")
 })
 public class HostRoleCommandEntity {
 
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
index ec30ea5..1411ba1 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
@@ -144,6 +144,14 @@
     <on-ambari-upgrade add="true"/>
   </property>
   <property>
+    <name>min_ambari_metrics_hadoop_sink_version</name>
+    <value>2.7.0.0</value>
+    <description>
+      Minimum version of ambari metrics hadoop sink that is compatible with this version of AMS.
+    </description>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
     <name>content</name>
     <display-name>ams-env template</display-name>
     <value>
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
index 20aaab3..5a6cd45 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
@@ -87,6 +87,16 @@
             <scriptType>PYTHON</scriptType>
             <timeout>1200</timeout>
           </commandScript>
+            <customCommands>
+              <customCommand>
+                <name>CHECK_HADOOP_SINK_VERSION</name>
+                <commandScript>
+                  <script>scripts/metrics_monitor.py</script>
+                  <scriptType>PYTHON</scriptType>
+                  <timeout>600</timeout>
+                </commandScript>
+              </customCommand>
+            </customCommands>
           <logs>
             <log>
               <logId>ams_monitor</logId>
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py
index 16c7997..03176ab 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py
@@ -22,6 +22,7 @@ from resource_management.libraries.script.script import Script
 from ams import ams
 from ams_service import ams_service
 from status import check_service_status
+from ambari_commons.repo_manager.repo_manager_helper import check_installed_metrics_hadoop_sink_version
 
 class AmsMonitor(Script):
   def install(self, env):
@@ -67,6 +68,11 @@ class AmsMonitor(Script):
     import params
     return params.ams_user
 
+  def check_hadoop_sink_version(self, env):
+    import params
+    check_installed_metrics_hadoop_sink_version(checked_version=params.min_hadoop_sink_version,
+                                                less_valid=False,
+                                                equal_valid=True)
 
 if __name__ == "__main__":
   AmsMonitor().execute()
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index 08fa675..458f45a 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -394,6 +394,7 @@ hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_nam
 
 
 clusterHostInfoDict = config["clusterHostInfo"]
+min_hadoop_sink_version = default("/configurations/ams-env/min_ambari_metrics_hadoop_sink_version", "2.7.0.0")
 
 hdfs_site = config['configurations']['hdfs-site']
 default_fs = config['configurations']['core-site']['fs.defaultFS']
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCheckTest.java b/ambari-server/src/test/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCheckTest.java
new file mode 100644
index 0000000..7209fc9
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/checks/AmbariMetricsHadoopSinkVersionCheckTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.checks;
+
+import static org.apache.ambari.server.checks.AmbariMetricsHadoopSinkVersionCompatibilityCheck.MIN_HADOOP_SINK_VERSION_PROPERTY_NAME;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.PrereqCheckRequest;
+import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.RequestStatus;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.RepositoryType;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.repository.ClusterVersionSummary;
+import org.apache.ambari.server.state.repository.VersionDefinitionXml;
+import org.apache.ambari.server.state.stack.PrereqCheckStatus;
+import org.apache.ambari.server.state.stack.PrerequisiteCheck;
+import org.apache.ambari.server.state.stack.UpgradePack;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.google.inject.Provider;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest ({AmbariServer.class, AbstractControllerResourceProvider.class, PropertyHelper.class})
+public class AmbariMetricsHadoopSinkVersionCheckTest {
+  private final Clusters m_clusters = Mockito.mock(Clusters.class);
+  private final AmbariMetricsHadoopSinkVersionCompatibilityCheck m_check = new AmbariMetricsHadoopSinkVersionCompatibilityCheck();
+  private final RepositoryVersionDAO repositoryVersionDAO = Mockito.mock(
+    RepositoryVersionDAO.class);
+
+  private ClusterVersionSummary m_clusterVersionSummary;
+
+  private VersionDefinitionXml m_vdfXml;
+
+  private RepositoryVersionEntity m_repositoryVersion;
+
+  final Map<String, Service> m_services = new HashMap<>();
+
+  /**
+   *
+   */
+  @Before
+  public void setup() throws Exception {
+
+    m_repositoryVersion = Mockito.mock(RepositoryVersionEntity.class);
+    m_clusterVersionSummary = Mockito.mock(ClusterVersionSummary.class);
+    m_vdfXml = Mockito.mock(VersionDefinitionXml.class);
+    MockitoAnnotations.initMocks(this);
+
+    m_check.clustersProvider = new Provider<Clusters>() {
+
+      @Override
+      public Clusters get() {
+        return m_clusters;
+      }
+    };
+    Configuration config = Mockito.mock(Configuration.class);
+    m_check.config = config;
+
+    when(m_repositoryVersion.getVersion()).thenReturn("3.0.0.0-1234");
+    when(m_repositoryVersion.getStackId()).thenReturn(new StackId("HDP", "3.0"));
+
+    m_services.clear();
+
+    when(m_repositoryVersion.getType()).thenReturn(RepositoryType.STANDARD);
+    when(m_repositoryVersion.getRepositoryXml()).thenReturn(m_vdfXml);
+    when(m_vdfXml.getClusterSummary(Mockito.any(Cluster.class))).thenReturn(m_clusterVersionSummary);
+    when(m_clusterVersionSummary.getAvailableServiceNames()).thenReturn(m_services.keySet());
+
+  }
+
+  /**
+   * Tests that the check is applicable when hive is installed.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testIsApplicable() throws Exception {
+    final Cluster cluster = Mockito.mock(Cluster.class);
+
+    when(cluster.getClusterId()).thenReturn(1L);
+    when(m_clusters.getCluster("cluster")).thenReturn(cluster);
+    when(cluster.getServices()).thenReturn(m_services);
+
+    m_services.put("HIVE", Mockito.mock(Service.class));
+
+    PrereqCheckRequest request = new PrereqCheckRequest("cluster");
+    request.setTargetRepositoryVersion(m_repositoryVersion);
+
+    Assert.assertFalse(m_check.isApplicable(request));
+
+    m_services.put("HDFS", Mockito.mock(Service.class));
+
+    m_check.repositoryVersionDaoProvider = new Provider<RepositoryVersionDAO>() {
+      @Override
+      public RepositoryVersionDAO get() {
+        return repositoryVersionDAO;
+      }
+    };
+
+    when(repositoryVersionDAO.findByStackNameAndVersion(Mockito.anyString(),
+      Mockito.anyString())).thenReturn(m_repositoryVersion);
+
+    Assert.assertTrue(m_check.isApplicable(request));
+  }
+
+  /**
+   * Tests that the warning is correctly tripped when there are not enough
+   * metastores.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 60000)
+  public void testPerform() throws Exception {
+
+    AmbariManagementController ambariManagementControllerMock = Mockito.mock(AmbariManagementController.class);
+    PowerMockito.mockStatic(AmbariServer.class);
+    when(AmbariServer.getController()).thenReturn(ambariManagementControllerMock);
+
+    ResourceProvider resourceProviderMock = mock(ResourceProvider.class);
+    PowerMockito.mockStatic(AbstractControllerResourceProvider.class);
+    when(AbstractControllerResourceProvider.getResourceProvider(eq(Resource.Type.Request), any(AmbariManagementController.class))).thenReturn(resourceProviderMock);
+
+    PowerMockito.mockStatic(PropertyHelper.class);
+    Request requestMock = mock(Request.class);
+    when(PropertyHelper.getCreateRequest(any(), any())).thenReturn(requestMock);
+    when(PropertyHelper.getPropertyId("Requests", "id")).thenReturn("requestIdProp");
+
+    RequestStatus requestStatusMock = mock(RequestStatus.class);
+    Resource responseResourceMock = mock(Resource.class);
+    when(resourceProviderMock.createResources(requestMock)).thenReturn(requestStatusMock);
+    when(requestStatusMock.getRequestResource()).thenReturn(responseResourceMock);
+    when(responseResourceMock.getPropertyValue(anyString())).thenReturn(100l);
+
+    Clusters clustersMock = mock(Clusters.class);
+    when(ambariManagementControllerMock.getClusters()).thenReturn(clustersMock);
+    Cluster clusterMock = mock(Cluster.class);
+    when(clustersMock.getCluster("c1")).thenReturn(clusterMock);
+    when(clusterMock.getHosts(eq("AMBARI_METRICS"), eq("METRICS_MONITOR"))).thenReturn(Collections.singleton("h1"));
+
+    RequestDAO requestDAOMock = mock(RequestDAO.class);
+    RequestEntity requestEntityMock  = mock(RequestEntity.class);
+    when(requestDAOMock.findByPks(Collections.singleton(100l), true)).thenReturn(Collections.singletonList(requestEntityMock));
+    when(requestEntityMock.getStatus()).thenReturn(HostRoleStatus.IN_PROGRESS).thenReturn(HostRoleStatus.COMPLETED);
+
+    Field requestDaoField = m_check.getClass().getDeclaredField("requestDAO");
+    requestDaoField.setAccessible(true);
+    requestDaoField.set(m_check, requestDAOMock);
+
+    PrerequisiteCheck check = new PrerequisiteCheck(null, "c1");
+    PrereqCheckRequest request = new PrereqCheckRequest("c1");
+    UpgradePack.PrerequisiteCheckConfig prerequisiteCheckConfig = new UpgradePack.PrerequisiteCheckConfig();
+    UpgradePack.PrerequisiteProperty prerequisiteProperty = new UpgradePack.PrerequisiteProperty();
+    prerequisiteProperty.name = MIN_HADOOP_SINK_VERSION_PROPERTY_NAME;
+    prerequisiteProperty.value = "2.7.0.0";
+    UpgradePack.PrerequisiteCheckProperties prerequisiteCheckProperties = new UpgradePack.PrerequisiteCheckProperties();
+    prerequisiteCheckProperties.name = "org.apache.ambari.server.checks.AmbariMetricsHadoopSinkVersionCompatibilityCheck";
+    prerequisiteCheckProperties.properties = Collections.singletonList(prerequisiteProperty);
+    prerequisiteCheckConfig.prerequisiteCheckProperties = Collections.singletonList(prerequisiteCheckProperties);
+    request.setPrerequisiteCheckConfig(prerequisiteCheckConfig);
+    request.setTargetRepositoryVersion(m_repositoryVersion);
+    m_check.perform(check, request);
+
+    Assert.assertEquals(PrereqCheckStatus.PASS, check.getStatus());
+  }
+
+  @Test(timeout = 60000)
+  public void testPerformFail() throws Exception{
+    AmbariManagementController ambariManagementControllerMock = Mockito.mock(AmbariManagementController.class);
+    PowerMockito.mockStatic(AmbariServer.class);
+    when(AmbariServer.getController()).thenReturn(ambariManagementControllerMock);
+
+    ResourceProvider resourceProviderMock = mock(ResourceProvider.class);
+    PowerMockito.mockStatic(AbstractControllerResourceProvider.class);
+    when(AbstractControllerResourceProvider.getResourceProvider(eq(Resource.Type.Request), any(AmbariManagementController.class))).thenReturn(resourceProviderMock);
+
+    PowerMockito.mockStatic(PropertyHelper.class);
+    Request requestMock = mock(Request.class);
+    when(PropertyHelper.getCreateRequest(any(), any())).thenReturn(requestMock);
+    when(PropertyHelper.getPropertyId("Requests", "id")).thenReturn("requestIdProp");
+
+    RequestStatus requestStatusMock = mock(RequestStatus.class);
+    Resource responseResourceMock = mock(Resource.class);
+    when(resourceProviderMock.createResources(requestMock)).thenReturn(requestStatusMock);
+    when(requestStatusMock.getRequestResource()).thenReturn(responseResourceMock);
+    when(responseResourceMock.getPropertyValue(anyString())).thenReturn(101l);
+
+    Clusters clustersMock = mock(Clusters.class);
+    when(ambariManagementControllerMock.getClusters()).thenReturn(clustersMock);
+    Cluster clusterMock = mock(Cluster.class);
+    when(clustersMock.getCluster("c1")).thenReturn(clusterMock);
+    when(clusterMock.getHosts(eq("AMBARI_METRICS"), eq("METRICS_MONITOR"))).thenReturn(Collections.singleton("h1_fail"));
+
+    RequestDAO requestDAOMock = mock(RequestDAO.class);
+    RequestEntity requestEntityMock  = mock(RequestEntity.class);
+    when(requestDAOMock.findByPks(Collections.singleton(101l), true)).thenReturn(Collections.singletonList(requestEntityMock));
+    when(requestEntityMock.getStatus()).thenReturn(HostRoleStatus.IN_PROGRESS).thenReturn(HostRoleStatus.FAILED);
+
+    Field requestDaoField = m_check.getClass().getDeclaredField("requestDAO");
+    requestDaoField.setAccessible(true);
+    requestDaoField.set(m_check, requestDAOMock);
+
+
+    when(requestEntityMock.getRequestId()).thenReturn(101l);
+    HostRoleCommandDAO hostRoleCommandDAOMock = mock(HostRoleCommandDAO.class);
+    HostRoleCommandEntity hrcEntityMock  = mock(HostRoleCommandEntity.class);
+    when(hostRoleCommandDAOMock.findByRequest(101l, true)).thenReturn(Collections.singletonList(hrcEntityMock));
+    when(hrcEntityMock.getStatus()).thenReturn(HostRoleStatus.FAILED);
+    when(hrcEntityMock.getHostName()).thenReturn("h1_fail");
+
+    Field hrcDaoField = m_check.getClass().getDeclaredField("hostRoleCommandDAO");
+    hrcDaoField.setAccessible(true);
+    hrcDaoField.set(m_check, hostRoleCommandDAOMock);
+
+    PrerequisiteCheck check = new PrerequisiteCheck(null, "c1");
+    PrereqCheckRequest request = new PrereqCheckRequest("c1");
+    UpgradePack.PrerequisiteCheckConfig prerequisiteCheckConfig = new UpgradePack.PrerequisiteCheckConfig();
+    UpgradePack.PrerequisiteProperty prerequisiteProperty = new UpgradePack.PrerequisiteProperty();
+    prerequisiteProperty.name = MIN_HADOOP_SINK_VERSION_PROPERTY_NAME;
+    prerequisiteProperty.value = "2.7.0.0";
+    UpgradePack.PrerequisiteCheckProperties prerequisiteCheckProperties = new UpgradePack.PrerequisiteCheckProperties();
+    prerequisiteCheckProperties.name = "org.apache.ambari.server.checks.AmbariMetricsHadoopSinkVersionCompatibilityCheck";
+    prerequisiteCheckProperties.properties = Collections.singletonList(prerequisiteProperty);
+    prerequisiteCheckConfig.prerequisiteCheckProperties = Collections.singletonList(prerequisiteCheckProperties);
+    request.setPrerequisiteCheckConfig(prerequisiteCheckConfig);
+    request.setTargetRepositoryVersion(m_repositoryVersion);
+    m_check.perform(check, request);
+
+    Assert.assertEquals(PrereqCheckStatus.FAIL, check.getStatus());
+    Assert.assertTrue(check.getFailReason().contains("upgrade 'ambari-metrics-hadoop-sink'"));
+    Assert.assertEquals(check.getFailedOn().size(), 1);
+    Assert.assertTrue(check.getFailedOn().iterator().next().contains("h1_fail"));
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
avijayan@apache.org.