You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ha...@apache.org on 2018/04/02 21:26:31 UTC
[ambari] branch branch-2.6 updated: [AMBARI-23417] Upgrade Requires
a Pre-Upgrade Configuration Validation Check for Kafka (dgrinenko) (#851)
This is an automated email from the ASF dual-hosted git repository.
hapylestat pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 2d0ae58 [AMBARI-23417] Upgrade Requires a Pre-Upgrade Configuration Validation Check for Kafka (dgrinenko) (#851)
2d0ae58 is described below
commit 2d0ae58d4806a193f6c3179be244283cf016781b
Author: Dmytro Grinenko <ha...@gmail.com>
AuthorDate: Tue Apr 3 00:26:29 2018 +0300
[AMBARI-23417] Upgrade Requires a Pre-Upgrade Configuration Validation Check for Kafka (dgrinenko) (#851)
[AMBARI-23417] Upgrade Requires a Pre-Upgrade Configuration Validation Check for Kafka (dgrinenko)
---
.../server/checks/AbstractCheckDescriptor.java | 23 ++-
.../ambari/server/checks/CheckDescription.java | 5 +
.../server/checks/HostsRepositoryVersionCheck.java | 2 +-
.../ambari/server/checks/KafkaPropertiesCheck.java | 142 +++++++++++++++
.../HDP/2.3/upgrades/nonrolling-upgrade-2.6.xml | 1 +
.../stacks/HDP/2.3/upgrades/upgrade-2.6.xml | 1 +
.../HDP/2.4/upgrades/nonrolling-upgrade-2.6.xml | 1 +
.../stacks/HDP/2.4/upgrades/upgrade-2.6.xml | 1 +
.../HDP/2.5/upgrades/nonrolling-upgrade-2.6.xml | 1 +
.../stacks/HDP/2.5/upgrades/upgrade-2.6.xml | 1 +
.../HDP/2.6/upgrades/nonrolling-upgrade-2.6.xml | 1 +
.../stacks/HDP/2.6/upgrades/upgrade-2.6.xml | 1 +
.../server/checks/KafkaPropertiesCheckTest.java | 202 +++++++++++++++++++++
13 files changed, 378 insertions(+), 4 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/AbstractCheckDescriptor.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/AbstractCheckDescriptor.java
index aabb1c7..fb9d467 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/AbstractCheckDescriptor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/AbstractCheckDescriptor.java
@@ -76,7 +76,7 @@ public abstract class AbstractCheckDescriptor {
Provider<RepositoryVersionHelper> repositoryVersionHelper;
@Inject
- Provider<AmbariMetaInfo> ambariMetaInfo;
+ protected Provider<AmbariMetaInfo> ambariMetaInfo;
@Inject
Configuration config;
@@ -185,8 +185,7 @@ public abstract class AbstractCheckDescriptor {
}
/**
- * Gets a cluster configuration property if it exists, or {@code null}
- * otherwise.
+ * Gets a cluster configuration property if it exists, or {@code null} otherwise.
*
* @param request
* the request (not {@code null}).
@@ -202,6 +201,23 @@ public abstract class AbstractCheckDescriptor {
throws AmbariException {
final String clusterName = request.getClusterName();
final Cluster cluster = clustersProvider.get().getCluster(clusterName);
+
+ return getProperty(cluster, configType, propertyName);
+ }
+
+ /**
+ * Gets a cluster configuration property if it exists, or {@code null} otherwise.
+ *
+ * @param cluster
+ * the cluster (not {@code null}).
+ * @param configType
+ * the configuration type, such as {@code hdfs-site} (not
+ * {@code null}).
+ * @param propertyName
+ * the name of the property (not {@code null}).
+ * @return the property value or {@code null} if not found.
+ */
+ protected String getProperty(Cluster cluster, String configType, String propertyName) {
final Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs();
final DesiredConfig desiredConfig = desiredConfigs.get(configType);
@@ -215,6 +231,7 @@ public abstract class AbstractCheckDescriptor {
return properties.get(propertyName);
}
+
/**
* Gets the fail reason
* @param key the failure text key
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
index ce48640..80ca42d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/CheckDescription.java
@@ -88,6 +88,11 @@ public class CheckDescription {
.put(AbstractCheckDescriptor.DEFAULT,
"The following hosts must have version {{version}} installed: {{fails}}.").build());
+ public static CheckDescription KAFKA_PROPERTIES_VALIDATION = new CheckDescription("KAFKA_PROPERTIES_VALIDATION",
+ PrereqCheckType.SERVICE,"Kafka properties should be set correctly",
+ new ImmutableMap.Builder<String, String>().put( AbstractCheckDescriptor.DEFAULT,
+ "The following Kafka properties should be set properly: {{fails}}").build());
+
public static CheckDescription SECONDARY_NAMENODE_MUST_BE_DELETED = new CheckDescription("SECONDARY_NAMENODE_MUST_BE_DELETED",
PrereqCheckType.HOST,
"The SNameNode component must be deleted from all hosts",
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/HostsRepositoryVersionCheck.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/HostsRepositoryVersionCheck.java
index 17c9dc3..fe28b98 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/checks/HostsRepositoryVersionCheck.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/HostsRepositoryVersionCheck.java
@@ -40,7 +40,7 @@ import com.google.inject.Singleton;
* Checks that all hosts have particular repository version. Hosts that are in
* maintenance mode will be skipped and will not report a warning. Even if they
* do not have the repo version, they will not be included in the upgrade
- * orchstration, so no warning is required.
+ * orchestration, so no warning is required.
*/
@Singleton
@UpgradeCheck(
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/KafkaPropertiesCheck.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/KafkaPropertiesCheck.java
new file mode 100644
index 0000000..3eb4679
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/KafkaPropertiesCheck.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.checks;
+
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.controller.PrereqCheckRequest;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.ServiceInfo;
+import org.apache.ambari.server.state.stack.PrereqCheckStatus;
+import org.apache.ambari.server.state.stack.PrerequisiteCheck;
+import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
+import org.apache.ambari.server.utils.VersionUtils;
+
+import com.google.inject.Singleton;
+import com.google.common.collect.Lists;
+
+/**
+ * Check Kafka configuration properties before upgrade.
+ *
+ * Property to check:
+ * inter.broker.protocol.version
+ * log.message.format.version
+ *
+ * These configurations must exist and have a value (the value for both should be aligned with the Kafka version on the current stack).
+ *
+ * For inter.broker.protocol.version :
+ * value is not empty
+ * value is set to current Kafka version in the stack (e.g. for HDP 2.6.x value should be 0.10.1, HDP 2.5.x should be 0.10.0, HDP 2.3x - 2.4x should be 0.9.0)
+ *
+ * For log.message.format.version:
+ * value is not empty (version can vary from current stack version)
+ */
+@Singleton
+@UpgradeCheck(
+ group = UpgradeCheckGroup.REPOSITORY_VERSION,
+ required = { UpgradeType.ROLLING, UpgradeType.NON_ROLLING, UpgradeType.HOST_ORDERED })
+public class KafkaPropertiesCheck extends AbstractCheckDescriptor {
+ private static String KAFKA_BROKER_CONFIG = "kafka-broker";
+ private static String KAFKA_SERVICE_NAME = "KAFKA";
+ private static String MIN_APPLICABLE_STACK_VERSION = "2.6.5";
+
+ private interface KafkaProperties{
+ String INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version";
+ String LOG_MESSAGE_FORMAT_VERSION = "log.message.format.version";
+ List<String> ALL_PROPERTIES = Arrays.asList(INTER_BROKER_PROTOCOL_VERSION, LOG_MESSAGE_FORMAT_VERSION);
+ }
+
+ /**
+ * Constructor.
+ */
+ public KafkaPropertiesCheck() {
+ super(CheckDescription.KAFKA_PROPERTIES_VALIDATION);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<CheckQualification> getQualifications() {
+ return Lists.<CheckQualification> newArrayList(new KafkaPropertiesMinVersionQualification());
+ }
+
+
+ private String getKafkaServiceVersion(Cluster cluster)throws AmbariException{
+ ServiceInfo serviceInfo = ambariMetaInfo.get().getStack(cluster.getCurrentStackVersion()).getService(KAFKA_SERVICE_NAME);
+
+ if (serviceInfo == null) {
+ return null;
+ }
+
+ String[] version = serviceInfo.getVersion().split(Pattern.quote("."));
+ if (version.length < 3) {
+ return null;
+ }
+ return String.format("%s.%s.%s", version[0], version[1], version[2]);
+ }
+
+ @Override
+ public void perform(PrerequisiteCheck prerequisiteCheck, PrereqCheckRequest request) throws AmbariException {
+ final String clusterName = request.getClusterName();
+ final Cluster cluster = clustersProvider.get().getCluster(clusterName);
+ LinkedHashSet<String> failedProperties = new LinkedHashSet<>();
+
+ for (String propertyName: KafkaProperties.ALL_PROPERTIES){
+ String propertyValue = getProperty(cluster, KAFKA_BROKER_CONFIG, propertyName);
+
+ if (propertyValue == null) {
+ failedProperties.add(propertyName);
+ } else if (propertyName.equals(KafkaProperties.INTER_BROKER_PROTOCOL_VERSION)) {
+ String stackKafkaVersion = getKafkaServiceVersion(cluster);
+
+ if (stackKafkaVersion != null && !stackKafkaVersion.equals(propertyValue)) {
+ failedProperties.add(String.format("%s (expected value \"%s\", actual \"%s\")",
+ propertyName, stackKafkaVersion, propertyValue));
+ }
+ }
+ }
+
+ if (!failedProperties.isEmpty()){
+ prerequisiteCheck.setFailedOn(failedProperties);
+ prerequisiteCheck.setFailReason(getFailReason(prerequisiteCheck, request));
+ prerequisiteCheck.setStatus(PrereqCheckStatus.FAIL);
+ }
+ }
+
+ /**
+ * Stack version, to which check should be applicable
+ */
+ private class KafkaPropertiesMinVersionQualification implements CheckQualification {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isApplicable(PrereqCheckRequest request) {
+ String minApplicableStackVersion = MIN_APPLICABLE_STACK_VERSION;
+ String targetStackVersion = request.getTargetRepositoryVersion().getVersion();
+
+ return VersionUtils.compareVersions(targetStackVersion, minApplicableStackVersion) >= 0;
+ }
+ }
+}
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.6.xml
index f72d607..85b32b6 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/nonrolling-upgrade-2.6.xml
@@ -21,6 +21,7 @@
<target-stack>HDP-2.6</target-stack>
<type>NON_ROLLING</type>
<prerequisite-checks>
+ <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
<check>org.apache.ambari.server.checks.RangerAuditDbCheck</check>
<check>org.apache.ambari.server.checks.ServicePresenceCheck</check>
<check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml
index 61e4794..04dc183 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml
@@ -36,6 +36,7 @@
<check>org.apache.ambari.server.checks.ServicePresenceCheck</check>
<check>org.apache.ambari.server.checks.RangerAuditDbCheck</check>
<check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
+ <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
<check>org.apache.ambari.server.checks.LZOCheck</check>
<!-- Specific to HDP 2.5, Storm is not rolling -->
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/nonrolling-upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/nonrolling-upgrade-2.6.xml
index 2f76f93..b779e1f 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/nonrolling-upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/nonrolling-upgrade-2.6.xml
@@ -21,6 +21,7 @@
<target-stack>HDP-2.6</target-stack>
<type>NON_ROLLING</type>
<prerequisite-checks>
+ <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
<check>org.apache.ambari.server.checks.RangerAuditDbCheck</check>
<check>org.apache.ambari.server.checks.ServicePresenceCheck</check>
<check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml
index 1e16194..f814ca8 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml
@@ -37,6 +37,7 @@
<check>org.apache.ambari.server.checks.ServicePresenceCheck</check>
<check>org.apache.ambari.server.checks.RangerAuditDbCheck</check>
<check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
+ <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
<check>org.apache.ambari.server.checks.LZOCheck</check>
<!-- Specific to HDP 2.5, Storm is not rolling -->
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/nonrolling-upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/nonrolling-upgrade-2.6.xml
index 611de7f..a780bdc 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/nonrolling-upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/nonrolling-upgrade-2.6.xml
@@ -21,6 +21,7 @@
<target-stack>HDP-2.6</target-stack>
<type>NON_ROLLING</type>
<prerequisite-checks>
+ <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
<check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
<check>org.apache.ambari.server.checks.LZOCheck</check>
<configuration>
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml
index c2c6d55..6f18642 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml
@@ -35,6 +35,7 @@
<check>org.apache.ambari.server.checks.YarnTimelineServerStatePreservingCheck</check>
<check>org.apache.ambari.server.checks.RangerSSLConfigCheck</check>
<check>org.apache.ambari.server.checks.DruidHighAvailabilityCheck</check>
+ <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
<check>org.apache.ambari.server.checks.LZOCheck</check>
<configuration>
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/nonrolling-upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/nonrolling-upgrade-2.6.xml
index db37624..1f221ab 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/nonrolling-upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/nonrolling-upgrade-2.6.xml
@@ -21,6 +21,7 @@
<target-stack>HDP-2.6</target-stack>
<type>NON_ROLLING</type>
<prerequisite-checks>
+ <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
<check>org.apache.ambari.server.checks.LZOCheck</check>
<configuration>
<!-- Configuration properties for all pre-reqs including required pre-reqs -->
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml b/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml
index 5017212..a3c0c9b 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml
@@ -34,6 +34,7 @@
<check>org.apache.ambari.server.checks.YarnRMHighAvailabilityCheck</check>
<check>org.apache.ambari.server.checks.YarnTimelineServerStatePreservingCheck</check>
<check>org.apache.ambari.server.checks.DruidHighAvailabilityCheck</check>
+ <check>org.apache.ambari.server.checks.KafkaPropertiesCheck</check>
<check>org.apache.ambari.server.checks.LZOCheck</check>
<configuration>
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/checks/KafkaPropertiesCheckTest.java b/ambari-server/src/test/java/org/apache/ambari/server/checks/KafkaPropertiesCheckTest.java
new file mode 100644
index 0000000..8c698b7
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/checks/KafkaPropertiesCheckTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.checks;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.controller.PrereqCheckRequest;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.DesiredConfig;
+import org.apache.ambari.server.state.RepositoryType;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceInfo;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.StackInfo;
+import org.apache.ambari.server.state.repository.ClusterVersionSummary;
+import org.apache.ambari.server.state.repository.VersionDefinitionXml;
+import org.apache.ambari.server.state.stack.PrereqCheckStatus;
+import org.apache.ambari.server.state.stack.PrerequisiteCheck;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import com.google.inject.Provider;
+
+
+/**
+ * Unit tests for KafkaPropertiesCheck
+ *
+ */
+public class KafkaPropertiesCheckTest {
+
+
+ private Clusters m_clusters = EasyMock.createMock(Clusters.class);
+ private Map<String, String> m_configMap = new HashMap<>();
+
+ @Mock
+ private ClusterVersionSummary m_clusterVersionSummary;
+
+ @Mock
+ private VersionDefinitionXml m_vdfXml;
+
+ @Mock
+ private RepositoryVersionEntity m_repositoryVersion;
+
+ private KafkaPropertiesCheck m_kafkaPropertiresCheck = null;
+ final Map<String, Service> m_services = new HashMap<>();
+
+ static String serviceName = "KAFKA";
+
+ @Before
+ public void setup() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ m_configMap.put("inter.broker.protocol.version", "0.0.1");
+ m_configMap.put("log.message.format.version", "1.0.0");
+
+ Cluster cluster = EasyMock.createMock(Cluster.class);
+ final AmbariMetaInfo m_ami = EasyMock.createMock(AmbariMetaInfo.class);
+ final StackInfo stackInfo = EasyMock.createMock(StackInfo.class);
+ final StackId stackId = new StackId("HDP", "2.3");
+ final ServiceInfo serviceInfo = EasyMock.createMock(ServiceInfo.class);
+
+ Config config = EasyMock.createMock(Config.class);
+ final Map<String, Service> services = new HashMap<>();
+ final Service service = EasyMock.createMock(Service.class);
+
+ services.put(serviceName, service);
+
+ Map<String, DesiredConfig> desiredMap = new HashMap<>();
+ DesiredConfig dc = EasyMock.createMock(DesiredConfig.class);
+ desiredMap.put("kafka-broker", dc);
+
+ expect(dc.getTag()).andReturn("").anyTimes();
+ expect(config.getProperties()).andReturn(m_configMap).anyTimes();
+ expect(cluster.getServices()).andReturn(services).anyTimes();
+ expect(cluster.getService(serviceName)).andReturn(service).anyTimes();
+ expect(cluster.getDesiredConfigs()).andReturn(desiredMap).anyTimes();
+ expect(cluster.getDesiredConfigByType((String) anyObject())).andReturn(config).anyTimes();
+ expect(cluster.getConfig((String) anyObject(), (String) anyObject())).andReturn(config).anyTimes();
+ expect(cluster.getDesiredStackVersion()).andReturn(stackId).anyTimes();
+ expect(cluster.getCurrentStackVersion()).andReturn(stackId).anyTimes();
+ expect(m_clusters.getCluster((String) anyObject())).andReturn(cluster).anyTimes();
+
+ expect(m_ami.getStack((StackId) anyObject())).andReturn(stackInfo).anyTimes();
+ expect(m_ami.getServices((String) anyObject(), (String) anyObject())).andReturn(new HashMap<String, ServiceInfo>()).anyTimes();
+ expect(stackInfo.getService((String) anyObject())).andReturn(serviceInfo).anyTimes();
+ expect(serviceInfo.getVersion()).andReturn("0.0.1.2.3").anyTimes();
+
+
+ replay(m_ami, m_clusters, cluster, dc, config, stackInfo, serviceInfo);
+
+ m_kafkaPropertiresCheck = new KafkaPropertiesCheck();
+ m_kafkaPropertiresCheck.clustersProvider = new Provider<Clusters>() {
+ @Override
+ public Clusters get() {
+ return m_clusters;
+ }
+ };
+
+ m_kafkaPropertiresCheck.ambariMetaInfo = new Provider<AmbariMetaInfo>() {
+ @Override
+ public AmbariMetaInfo get() {
+ return m_ami;
+ }
+ };
+
+ m_services.clear();
+
+ Mockito.when(m_repositoryVersion.getType()).thenReturn(RepositoryType.STANDARD);
+ Mockito.when(m_repositoryVersion.getRepositoryXml()).thenReturn(m_vdfXml);
+ Mockito.when(m_repositoryVersion.getVersion()).thenReturn("2.6.5.1");
+ Mockito.when(m_vdfXml.getClusterSummary(Mockito.any(Cluster.class))).thenReturn(m_clusterVersionSummary);
+ Mockito.when(m_clusterVersionSummary.getAvailableServiceNames()).thenReturn(m_services.keySet());
+ }
+
+ @Test
+ public void testApplicable() throws Exception {
+
+ final Service service = EasyMock.createMock(Service.class);
+ m_services.put("KAFKA", service);
+
+ Cluster cluster = m_clusters.getCluster("cluster");
+ EasyMock.reset(cluster);
+ expect(cluster.getServices()).andReturn(m_services).anyTimes();
+ expect(cluster.getCurrentStackVersion()).andReturn(new StackId("HDP-2.3")).anyTimes();
+ replay(cluster);
+
+ PrereqCheckRequest request = new PrereqCheckRequest("cluster");
+ request.setTargetRepositoryVersion(m_repositoryVersion);
+
+ assertTrue(m_kafkaPropertiresCheck.isApplicable(request));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMissingProps() throws Exception {
+
+ m_configMap.clear();
+
+ PrerequisiteCheck check = new PrerequisiteCheck(null, null);
+ m_kafkaPropertiresCheck.perform(check, new PrereqCheckRequest("cluster"));
+ assertEquals(PrereqCheckStatus.FAIL, check.getStatus());
+ assertEquals("The following Kafka properties should be set properly: inter.broker.protocol.version and log.message.format.version", check.getFailReason());
+
+
+ m_configMap.put("inter.broker.protocol.version", "0.0.2");
+ m_configMap.put("log.message.format.version", "1.0.0");
+ check = new PrerequisiteCheck(null, null);
+ m_kafkaPropertiresCheck.perform(check, new PrereqCheckRequest("cluster"));
+ assertEquals(PrereqCheckStatus.FAIL, check.getStatus());
+
+ m_configMap.clear();
+
+ m_configMap.put("inter.broker.protocol.version", "0.0.1");
+ m_configMap.put("log.message.format.version", "1.0.0");
+ check = new PrerequisiteCheck(null, null);
+ m_kafkaPropertiresCheck.perform(check, new PrereqCheckRequest("cluster"));
+ assertEquals(PrereqCheckStatus.PASS, check.getStatus());
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testNormal() throws Exception {
+
+ PrerequisiteCheck check = new PrerequisiteCheck(null, null);
+ m_kafkaPropertiresCheck.perform(check, new PrereqCheckRequest("cluster"));
+
+ assertEquals(PrereqCheckStatus.PASS, check.getStatus());
+
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
hapylestat@apache.org.