You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ha...@apache.org on 2018/04/02 21:26:31 UTC

[ambari] branch branch-2.6 updated: [AMBARI-23417] Upgrade Requires a Pre-Upgrade Configuration Validation Check for Kafka (dgrinenko) (#851)

This is an automated email from the ASF dual-hosted git repository.

hapylestat pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 2d0ae58  [AMBARI-23417] Upgrade Requires a Pre-Upgrade Configuration Validation Check for Kafka (dgrinenko) (#851)
2d0ae58 is described below

commit 2d0ae58d4806a193f6c3179be244283cf016781b
Author: Dmytro Grinenko <ha...@gmail.com>
AuthorDate: Tue Apr 3 00:26:29 2018 +0300

    [AMBARI-23417] Upgrade Requires a Pre-Upgrade Configuration Validation Check for Kafka (dgrinenko) (#851)
    
    [AMBARI-23417] Upgrade Requires a Pre-Upgrade Configuration Validation Check for Kafka (dgrinenko)
---
 .../server/checks/AbstractCheckDescriptor.java     |  23 ++-
 .../ambari/server/checks/CheckDescription.java     |   5 +
 .../server/checks/HostsRepositoryVersionCheck.java |   2 +-
 .../ambari/server/checks/KafkaPropertiesCheck.java | 142 +++++++++++++++
 .../HDP/2.3/upgrades/nonrolling-upgrade-2.6.xml    |   1 +
 .../stacks/HDP/2.3/upgrades/upgrade-2.6.xml        |   1 +
 .../HDP/2.4/upgrades/nonrolling-upgrade-2.6.xml    |   1 +
 .../stacks/HDP/2.4/upgrades/upgrade-2.6.xml        |   1 +
 .../HDP/2.5/upgrades/nonrolling-upgrade-2.6.xml    |   1 +
 .../stacks/HDP/2.5/upgrades/upgrade-2.6.xml        |   1 +
 .../HDP/2.6/upgrades/nonrolling-upgrade-2.6.xml    |   1 +
 .../stacks/HDP/2.6/upgrades/upgrade-2.6.xml        |   1 +
 .../server/checks/KafkaPropertiesCheckTest.java    | 202 +++++++++++++++++++++
 13 files changed, 378 insertions(+), 4 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/AbstractCheckDescriptor.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/AbstractCheckDescriptor.java
index aabb1c7..fb9d467 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/AbstractCheckDescriptor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/AbstractCheckDescriptor.java
@@ -76,7 +76,7 @@ public abstract class AbstractCheckDescriptor {
   Provider<RepositoryVersionHelper> repositoryVersionHelper;
 
   @Inject
-  Provider<AmbariMetaInfo> ambariMetaInfo;
+  protected Provider<AmbariMetaInfo> ambariMetaInfo;
 
   @Inject
   Configuration config;
@@ -185,8 +185,7 @@ public abstract class AbstractCheckDescriptor {
   }
 
   /**
-   * Gets a cluster configuration property if it exists, or {@code null}
-   * otherwise.
+   * Gets a cluster configuration property if it exists, or {@code null} otherwise.
    *
    * @param request
    *          the request (not {@code null}).
@@ -202,6 +201,23 @@ public abstract class AbstractCheckDescriptor {
       throws AmbariException {
     final String clusterName = request.getClusterName();
     final Cluster cluster = clustersProvider.get().getCluster(clusterName);
+
+    return getProperty(cluster, configType, propertyName);
+  }
+
+  /**
+   * Gets a cluster configuration property if it exists, or {@code null} otherwise.
+   *
+   * @param cluster
+   *          the cluster (not {@code null}).
+   * @param configType
+   *          the configuration type, such as {@code hdfs-site} (not
+   *          {@code null}).
+   * @param propertyName
+   *          the name of the property (not {@code null}).
+   * @return the property value or {@code null} if not found.
+   */
+  protected String getProperty(Cluster cluster, String configType, String propertyName) {
     final Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs();
     final DesiredConfig desiredConfig = desiredConfigs.get(configType);
 
@@ -215,6 +231,7 @@ public abstract class AbstractCheckDescriptor {
     return properties.get(propertyName);
   }
 
+
   /**
    * Gets the fail reason
    * @param key               the failure text key
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
index ce48640..80ca42d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
@@ -88,6 +88,11 @@ public class CheckDescription {
       .put(AbstractCheckDescriptor.DEFAULT,
           "The following hosts must have version {{version}} installed: {{fails}}.").build());
 
+   public static CheckDescription KAFKA_PROPERTIES_VALIDATION = new CheckDescription("KAFKA_PROPERTIES_VALIDATION",
+    PrereqCheckType.SERVICE,"Kafka properties should be set correctly",
+    new ImmutableMap.Builder<String, String>().put( AbstractCheckDescriptor.DEFAULT,
+      "The following Kafka properties should be set properly: {{fails}}").build());
+
   public static CheckDescription SECONDARY_NAMENODE_MUST_BE_DELETED = new CheckDescription("SECONDARY_NAMENODE_MUST_BE_DELETED",
     PrereqCheckType.HOST,
     "The SNameNode component must be deleted from all hosts",
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/HostsRepositoryVersionCheck.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/HostsRepositoryVersionCheck.java
index 17c9dc3..fe28b98 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/HostsRepositoryVersionCheck.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/HostsRepositoryVersionCheck.java
@@ -40,7 +40,7 @@ import com.google.inject.Singleton;
  * Checks that all hosts have particular repository version. Hosts that are in
  * maintenance mode will be skipped and will not report a warning. Even if they
  * do not have the repo version, they will not be included in the upgrade
- * orchstration, so no warning is required.
+ * orchestration, so no warning is required.
  */
 @Singleton
 @UpgradeCheck(
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/KafkaPropertiesCheck.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/KafkaPropertiesCheck.java
new file mode 100644
index 0000000..3eb4679
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/KafkaPropertiesCheck.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.checks;
+
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.controller.PrereqCheckRequest;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.ServiceInfo;
+import org.apache.ambari.server.state.stack.PrereqCheckStatus;
+import org.apache.ambari.server.state.stack.PrerequisiteCheck;
+import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
+import org.apache.ambari.server.utils.VersionUtils;
+
+import com.google.inject.Singleton;
+import com.google.common.collect.Lists;
+
+/**
+ * Check Kafka configuration properties before upgrade.
+ *
+ * Property to check:
+ *  inter.broker.protocol.version
+ *  log.message.format.version
+ *
+ * These configurations must exist and have a value (the value for both should be aligned with the Kafka version on the current stack).
+ *
+ * For inter.broker.protocol.version :
+ * value is not empty
+ * value is set to current Kafka version in the stack (e.g. for HDP 2.6.x value should be 0.10.1, HDP 2.5.x should be 0.10.0, HDP 2.3x - 2.4x should be 0.9.0)
+ *
+ * For log.message.format.version:
+ * value is not empty (version can vary from current stack version)
+ */
+@Singleton
+@UpgradeCheck(
+    group = UpgradeCheckGroup.REPOSITORY_VERSION,
+    required = { UpgradeType.ROLLING, UpgradeType.NON_ROLLING, UpgradeType.HOST_ORDERED })
+public class KafkaPropertiesCheck extends AbstractCheckDescriptor {
+  private static String KAFKA_BROKER_CONFIG = "kafka-broker";
+  private static String KAFKA_SERVICE_NAME = "KAFKA";
+  private static String MIN_APPLICABLE_STACK_VERSION = "2.6.5";
+
+  private interface KafkaProperties{
+    String INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version";
+    String LOG_MESSAGE_FORMAT_VERSION = "log.message.format.version";
+    List<String> ALL_PROPERTIES = Arrays.asList(INTER_BROKER_PROTOCOL_VERSION, LOG_MESSAGE_FORMAT_VERSION);
+  }
+
+  /**
+   * Constructor.
+   */
+  public KafkaPropertiesCheck() {
+    super(CheckDescription.KAFKA_PROPERTIES_VALIDATION);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<CheckQualification> getQualifications() {
+    return Lists.<CheckQualification> newArrayList(new KafkaPropertiesMinVersionQualification());
+  }
+
+
+  private String getKafkaServiceVersion(Cluster cluster)throws AmbariException{
+    ServiceInfo serviceInfo = ambariMetaInfo.get().getStack(cluster.getCurrentStackVersion()).getService(KAFKA_SERVICE_NAME);
+
+    if (serviceInfo == null) {
+      return null;
+    }
+
+    String[] version = serviceInfo.getVersion().split(Pattern.quote("."));
+    if (version.length < 3) {
+      return null;
+    }
+    return String.format("%s.%s.%s", version[0], version[1], version[2]);
+  }
+
+  @Override
+  public void perform(PrerequisiteCheck prerequisiteCheck, PrereqCheckRequest request) throws AmbariException {
+    final String clusterName = request.getClusterName();
+    final Cluster cluster = clustersProvider.get().getCluster(clusterName);
+    LinkedHashSet<String> failedProperties = new LinkedHashSet<>();
+
+    for (String propertyName: KafkaProperties.ALL_PROPERTIES){
+      String propertyValue = getProperty(cluster, KAFKA_BROKER_CONFIG, propertyName);
+
+      if (propertyValue == null) {
+        failedProperties.add(propertyName);
+      } else if (propertyName.equals(KafkaProperties.INTER_BROKER_PROTOCOL_VERSION)) {
+        String stackKafkaVersion = getKafkaServiceVersion(cluster);
+
+        if (stackKafkaVersion != null && !stackKafkaVersion.equals(propertyValue)) {
+          failedProperties.add(String.format("%s (expected value \"%s\", actual \"%s\")",
+            propertyName, stackKafkaVersion, propertyValue));
+        }
+      }
+    }
+
+    if (!failedProperties.isEmpty()){
+      prerequisiteCheck.setFailedOn(failedProperties);
+      prerequisiteCheck.setFailReason(getFailReason(prerequisiteCheck, request));
+      prerequisiteCheck.setStatus(PrereqCheckStatus.FAIL);
+    }
+  }
+
+  /**
+   * Stack version, to which check should be applicable
+   */
+  private class KafkaPropertiesMinVersionQualification implements CheckQualification {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isApplicable(PrereqCheckRequest request) {
+      String minApplicableStackVersion = MIN_APPLICABLE_STACK_VERSION;
+      String targetStackVersion =  request.getTargetRepositoryVersion().getVersion();
+
+      return  VersionUtils.compareVersions(targetStackVersion, minApplicableStackVersion) >= 0;
+    }
+  }
+}
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.6.xml
index f72d607..85b32b6 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.6.xml
@@ -21,6 +21,7 @@
   <target-stack>HDP-2.6</target-stack>
   <type>NON_ROLLING</type>
   <prerequisite-checks>
+    <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
     <check>org.apache.ambari.server.checks.RangerAuditDbCheck</check>
     <check>org.apache.ambari.server.checks.ServicePresenceCheck</check>
     <check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml
index 61e4794..04dc183 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml
@@ -36,6 +36,7 @@
     <check>org.apache.ambari.server.checks.ServicePresenceCheck</check>
     <check>org.apache.ambari.server.checks.RangerAuditDbCheck</check>
     <check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
+    <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
     <check>org.apache.ambari.server.checks.LZOCheck</check>
 
     <!-- Specific to HDP 2.5, Storm is not rolling -->
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/nonrolling-upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/nonrolling-upgrade-2.6.xml
index 2f76f93..b779e1f 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/nonrolling-upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/nonrolling-upgrade-2.6.xml
@@ -21,6 +21,7 @@
   <target-stack>HDP-2.6</target-stack>
   <type>NON_ROLLING</type>
   <prerequisite-checks>
+    <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
     <check>org.apache.ambari.server.checks.RangerAuditDbCheck</check>
     <check>org.apache.ambari.server.checks.ServicePresenceCheck</check>
     <check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml
index 1e16194..f814ca8 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml
@@ -37,6 +37,7 @@
     <check>org.apache.ambari.server.checks.ServicePresenceCheck</check>
     <check>org.apache.ambari.server.checks.RangerAuditDbCheck</check>
     <check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
+    <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
     <check>org.apache.ambari.server.checks.LZOCheck</check>
 
     <!-- Specific to HDP 2.5, Storm is not rolling -->
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/nonrolling-upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/nonrolling-upgrade-2.6.xml
index 611de7f..a780bdc 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/nonrolling-upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/nonrolling-upgrade-2.6.xml
@@ -21,6 +21,7 @@
   <target-stack>HDP-2.6</target-stack>
   <type>NON_ROLLING</type>
   <prerequisite-checks>
+    <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
     <check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
     <check>org.apache.ambari.server.checks.LZOCheck</check>
     <configuration>
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml
index c2c6d55..6f18642 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml
@@ -35,6 +35,7 @@
     <check>org.apache.ambari.server.checks.YarnTimelineServerStatePreservingCheck</check>
     <check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
     <check>org.apache.ambari.server.checks.DruidHighAvailabilityCheck</check>
+    <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
     <check>org.apache.ambari.server.checks.LZOCheck</check>
 
     <configuration>
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/nonrolling-upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/nonrolling-upgrade-2.6.xml
index db37624..1f221ab 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/nonrolling-upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/nonrolling-upgrade-2.6.xml
@@ -21,6 +21,7 @@
   <target-stack>HDP-2.6</target-stack>
   <type>NON_ROLLING</type>
   <prerequisite-checks>
+    <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
     <check>org.apache.ambari.server.checks.LZOCheck</check>
     <configuration>
       <!-- Configuration properties for all pre-reqs including required pre-reqs -->
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml
index 5017212..a3c0c9b 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml
@@ -34,6 +34,7 @@
     <check>org.apache.ambari.server.checks.YarnRMHighAvailabilityCheck</check>
     <check>org.apache.ambari.server.checks.YarnTimelineServerStatePreservingCheck</check>
     <check>org.apache.ambari.server.checks.DruidHighAvailabilityCheck</check>
+    <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
     <check>org.apache.ambari.server.checks.LZOCheck</check>
 
     <configuration>
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/checks/KafkaPropertiesCheckTest.java b/ambari-server/src/test/java/org/apache/ambari/server/checks/KafkaPropertiesCheckTest.java
new file mode 100644
index 0000000..8c698b7
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/checks/KafkaPropertiesCheckTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.checks;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.controller.PrereqCheckRequest;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.DesiredConfig;
+import org.apache.ambari.server.state.RepositoryType;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceInfo;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
+import org.apache.ambari.server.state.repository.ClusterVersionSummary;
+import org.apache.ambari.server.state.repository.VersionDefinitionXml;
+import org.apache.ambari.server.state.stack.PrereqCheckStatus;
+import org.apache.ambari.server.state.stack.PrerequisiteCheck;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import com.google.inject.Provider;
+
+
+/**
+ * Unit tests for KafkaPropertiesCheck
+ *
+ */
+public class KafkaPropertiesCheckTest {
+
+
+  private Clusters m_clusters = EasyMock.createMock(Clusters.class);
+  private Map<String, String> m_configMap = new HashMap<>();
+
+  @Mock
+  private ClusterVersionSummary m_clusterVersionSummary;
+
+  @Mock
+  private VersionDefinitionXml m_vdfXml;
+
+  @Mock
+  private RepositoryVersionEntity m_repositoryVersion;
+
+  private KafkaPropertiesCheck m_kafkaPropertiresCheck = null;
+  final Map<String, Service> m_services = new HashMap<>();
+
+  static String serviceName = "KAFKA";
+
+  @Before
+  public void setup() throws Exception {
+    MockitoAnnotations.initMocks(this);
+
+    m_configMap.put("inter.broker.protocol.version", "0.0.1");
+    m_configMap.put("log.message.format.version", "1.0.0");
+
+    Cluster cluster = EasyMock.createMock(Cluster.class);
+    final AmbariMetaInfo m_ami = EasyMock.createMock(AmbariMetaInfo.class);
+    final StackInfo stackInfo = EasyMock.createMock(StackInfo.class);
+    final StackId stackId = new StackId("HDP", "2.3");
+    final ServiceInfo serviceInfo = EasyMock.createMock(ServiceInfo.class);
+
+    Config config = EasyMock.createMock(Config.class);
+    final Map<String, Service> services = new HashMap<>();
+    final Service service = EasyMock.createMock(Service.class);
+
+    services.put(serviceName, service);
+
+    Map<String, DesiredConfig> desiredMap = new HashMap<>();
+    DesiredConfig dc = EasyMock.createMock(DesiredConfig.class);
+    desiredMap.put("kafka-broker", dc);
+
+    expect(dc.getTag()).andReturn("").anyTimes();
+    expect(config.getProperties()).andReturn(m_configMap).anyTimes();
+    expect(cluster.getServices()).andReturn(services).anyTimes();
+    expect(cluster.getService(serviceName)).andReturn(service).anyTimes();
+    expect(cluster.getDesiredConfigs()).andReturn(desiredMap).anyTimes();
+    expect(cluster.getDesiredConfigByType((String) anyObject())).andReturn(config).anyTimes();
+    expect(cluster.getConfig((String) anyObject(), (String) anyObject())).andReturn(config).anyTimes();
+    expect(cluster.getDesiredStackVersion()).andReturn(stackId).anyTimes();
+    expect(cluster.getCurrentStackVersion()).andReturn(stackId).anyTimes();
+    expect(m_clusters.getCluster((String) anyObject())).andReturn(cluster).anyTimes();
+
+    expect(m_ami.getStack((StackId) anyObject())).andReturn(stackInfo).anyTimes();
+    expect(m_ami.getServices((String) anyObject(), (String) anyObject())).andReturn(new HashMap<String, ServiceInfo>()).anyTimes();
+    expect(stackInfo.getService((String) anyObject())).andReturn(serviceInfo).anyTimes();
+    expect(serviceInfo.getVersion()).andReturn("0.0.1.2.3").anyTimes();
+
+
+    replay(m_ami, m_clusters, cluster, dc, config, stackInfo, serviceInfo);
+
+    m_kafkaPropertiresCheck = new KafkaPropertiesCheck();
+    m_kafkaPropertiresCheck.clustersProvider = new Provider<Clusters>() {
+      @Override
+      public Clusters get() {
+        return m_clusters;
+      }
+    };
+
+    m_kafkaPropertiresCheck.ambariMetaInfo = new Provider<AmbariMetaInfo>() {
+      @Override
+      public AmbariMetaInfo get() {
+        return m_ami;
+      }
+    };
+
+    m_services.clear();
+
+    Mockito.when(m_repositoryVersion.getType()).thenReturn(RepositoryType.STANDARD);
+    Mockito.when(m_repositoryVersion.getRepositoryXml()).thenReturn(m_vdfXml);
+    Mockito.when(m_repositoryVersion.getVersion()).thenReturn("2.6.5.1");
+    Mockito.when(m_vdfXml.getClusterSummary(Mockito.any(Cluster.class))).thenReturn(m_clusterVersionSummary);
+    Mockito.when(m_clusterVersionSummary.getAvailableServiceNames()).thenReturn(m_services.keySet());
+  }
+
+  @Test
+  public void testApplicable() throws Exception {
+
+    final Service service = EasyMock.createMock(Service.class);
+    m_services.put("KAFKA", service);
+
+    Cluster cluster = m_clusters.getCluster("cluster");
+    EasyMock.reset(cluster);
+    expect(cluster.getServices()).andReturn(m_services).anyTimes();
+    expect(cluster.getCurrentStackVersion()).andReturn(new StackId("HDP-2.3")).anyTimes();
+    replay(cluster);
+
+    PrereqCheckRequest request = new PrereqCheckRequest("cluster");
+    request.setTargetRepositoryVersion(m_repositoryVersion);
+
+    assertTrue(m_kafkaPropertiresCheck.isApplicable(request));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testMissingProps() throws Exception {
+
+    m_configMap.clear();
+
+    PrerequisiteCheck check = new PrerequisiteCheck(null, null);
+    m_kafkaPropertiresCheck.perform(check, new PrereqCheckRequest("cluster"));
+    assertEquals(PrereqCheckStatus.FAIL, check.getStatus());
+    assertEquals("The following Kafka properties should be set properly: inter.broker.protocol.version and log.message.format.version", check.getFailReason());
+
+
+    m_configMap.put("inter.broker.protocol.version", "0.0.2");
+    m_configMap.put("log.message.format.version", "1.0.0");
+    check = new PrerequisiteCheck(null, null);
+    m_kafkaPropertiresCheck.perform(check, new PrereqCheckRequest("cluster"));
+    assertEquals(PrereqCheckStatus.FAIL, check.getStatus());
+
+    m_configMap.clear();
+
+    m_configMap.put("inter.broker.protocol.version", "0.0.1");
+    m_configMap.put("log.message.format.version", "1.0.0");
+    check = new PrerequisiteCheck(null, null);
+    m_kafkaPropertiresCheck.perform(check, new PrereqCheckRequest("cluster"));
+    assertEquals(PrereqCheckStatus.PASS, check.getStatus());
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNormal() throws Exception {
+
+    PrerequisiteCheck check = new PrerequisiteCheck(null, null);
+    m_kafkaPropertiresCheck.perform(check, new PrereqCheckRequest("cluster"));
+
+    assertEquals(PrereqCheckStatus.PASS, check.getStatus());
+
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
hapylestat@apache.org.