You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2017/05/17 17:56:44 UTC
ambari git commit: AMBARI-21043. Backport Ambari-17694 - Kafka
listeners property does not show SASL_PLAINTEXT protocol when Kerberos is
enabled (rlevas)
Repository: ambari
Updated Branches:
refs/heads/branch-2.5 8347d5f03 -> 3fd229c7f
AMBARI-21043. Backport Ambari-17694 - Kafka listeners property does not show SASL_PLAINTEXT protocol when Kerberos is enabled (rlevas)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3fd229c7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3fd229c7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3fd229c7
Branch: refs/heads/branch-2.5
Commit: 3fd229c7f4f18b27513bcebaf40257d28ce2e912
Parents: 8347d5f
Author: Robert Levas <rl...@hortonworks.com>
Authored: Wed May 17 13:56:24 2017 -0400
Committer: Robert Levas <rl...@hortonworks.com>
Committed: Wed May 17 13:56:24 2017 -0400
----------------------------------------------------------------------
.../kerberos/VariableReplacementHelper.java | 46 ++++++++--
.../server/upgrade/UpgradeCatalog240.java | 25 ++++++
.../server/upgrade/UpgradeCatalog251.java | 45 ++++++++++
.../KAFKA/0.8.1/package/scripts/kafka.py | 15 ++--
.../common-services/KAFKA/0.9.0/kerberos.json | 3 +-
.../stacks/HDP/2.3/services/stack_advisor.py | 19 +++-
.../stacks/HDP/2.5/services/KAFKA/kerberos.json | 3 +-
.../kerberos/VariableReplacementHelperTest.java | 6 ++
.../server/upgrade/UpgradeCatalog240Test.java | 63 ++++++++++++++
.../server/upgrade/UpgradeCatalog251Test.java | 92 ++++++++++++++++++++
10 files changed, 295 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/3fd229c7/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
index 77333b8..f463cee 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
@@ -18,12 +18,6 @@
package org.apache.ambari.server.state.kerberos;
-import com.google.inject.Singleton;
-import org.apache.ambari.server.AmbariException;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -31,6 +25,13 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.ambari.server.AmbariException;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Singleton;
+
/**
* Helper class to provide variable replacement services
*/
@@ -56,6 +57,7 @@ public class VariableReplacementHelper {
{
put("each", new EachFunction());
put("toLower", new ToLowerFunction());
+ put("replace", new ReplaceValue());
put("append", new AppendFunction());
}
};
@@ -238,6 +240,38 @@ public class VariableReplacementHelper {
}
/**
+ * ReplaceValue is a Function implementation that replaces the value in the string
+ * <p/>
+ * This function expects the following arguments (in order) within the args array:
+ * <ol>
+ * <li>regular expression that should be replaced</li>
+ * <li>replacement value for the string</li>
+ * </ol>
+ */
+ private static class ReplaceValue implements Function {
+
+ @Override
+ public String perform(String[] args, String data, Map<String, Map<String, String>> replacementsMap) {
+ if ((args == null) || (args.length != 2)) {
+ throw new IllegalArgumentException("Invalid number of arguments encountered");
+ }
+ if (data != null) {
+ StringBuffer builder = new StringBuffer();
+ String regex = args[0];
+ String replacement = args[1];
+ Pattern pattern = Pattern.compile(regex);
+ Matcher matcher = pattern.matcher(data);
+ while (matcher.find()) {
+ matcher.appendReplacement(builder, replacement);
+ }
+ matcher.appendTail(builder);
+ return builder.toString();
+ }
+ return "";
+ }
+ }
+
+ /**
* ToLowerFunction is a Function implementation that converts a String to lowercase
*/
private static class ToLowerFunction implements Function {
http://git-wip-us.apache.org/repos/asf/ambari/blob/3fd229c7/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
index cf0c37d..87759d9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
@@ -182,6 +182,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
protected static final String EXTENSION_ID_COLUMN = "extension_id";
protected static final String EXTENSION_LINK_TABLE = "extensionlink";
protected static final String EXTENSION_LINK_ID_COLUMN = "link_id";
+ protected static final String KAFKA_BROKER_CONFIG = "kafka-broker";
private static final Map<String, Integer> ROLE_ORDER;
private static final String AMS_HBASE_SITE = "ams-hbase-site";
@@ -381,6 +382,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
addManageUserPersistedDataPermission();
allowClusterOperatorToManageCredentials();
updateHDFSConfigs();
+ updateKAFKAConfigs();
updateHIVEConfigs();
updateAMSConfigs();
updateClusterEnv();
@@ -1925,6 +1927,29 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
}
}
+ protected void updateKAFKAConfigs() throws AmbariException {
+ AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+ Clusters clusters = ambariManagementController.getClusters();
+ if (clusters != null) {
+ Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
+ if (clusterMap != null && !clusterMap.isEmpty()) {
+ for (final Cluster cluster : clusterMap.values()) {
+ Set<String> installedServices = cluster.getServices().keySet();
+
+ if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS) {
+ Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG);
+ if (kafkaBroker != null) {
+ String listenersPropertyValue = kafkaBroker.getProperties().get("listeners");
+ if (StringUtils.isNotEmpty(listenersPropertyValue)) {
+ String newListenersPropertyValue = listenersPropertyValue.replaceAll("\\bPLAINTEXT\\b", "PLAINTEXTSASL");
+ updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
protected void updateHIVEConfigs() throws AmbariException {
AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
http://git-wip-us.apache.org/repos/asf/ambari/blob/3fd229c7/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
index 146520f..dce60dc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
@@ -18,12 +18,22 @@
package org.apache.ambari.server.upgrade;
import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
import com.google.inject.Inject;
import com.google.inject.Injector;
+
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.SecurityType;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +50,7 @@ public class UpgradeCatalog251 extends AbstractUpgradeCatalog {
private static final String CLUSTER_HOST_INFO_COLUMN = "cluster_host_info";
private static final String REQUEST_ID_COLUMN = "request_id";
+ protected static final String KAFKA_BROKER_CONFIG = "kafka-broker";
/**
* Logger.
@@ -93,6 +104,40 @@ public class UpgradeCatalog251 extends AbstractUpgradeCatalog {
*/
@Override
protected void executeDMLUpdates() throws AmbariException, SQLException {
+ updateKAFKAConfigs();
+ }
+
+ /**
+ * Ensure that the updates from Ambari 2.4.0 are applied in the event the initial version is
+ * Ambari 2.5.0, since this Kafka change failed to make it into Ambari 2.5.0.
+ *
+ * If the base version was before Ambari 2.5.0, this method should wind up doing nothing.
+ * @throws AmbariException
+ */
+ protected void updateKAFKAConfigs() throws AmbariException {
+ AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+ Clusters clusters = ambariManagementController.getClusters();
+ if (clusters != null) {
+ Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
+ if (clusterMap != null && !clusterMap.isEmpty()) {
+ for (final Cluster cluster : clusterMap.values()) {
+ Set<String> installedServices = cluster.getServices().keySet();
+
+ if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS) {
+ Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG);
+ if (kafkaBroker != null) {
+ String listenersPropertyValue = kafkaBroker.getProperties().get("listeners");
+ if (StringUtils.isNotEmpty(listenersPropertyValue)) {
+ String newListenersPropertyValue = listenersPropertyValue.replaceAll("\\bPLAINTEXT\\b", "PLAINTEXTSASL");
+ if(!newListenersPropertyValue.equals(listenersPropertyValue)) {
+ updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/3fd229c7/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
index 1327090..680dd32 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
@@ -80,21 +80,16 @@ def kafka(upgrade_type=None):
listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
Logger.info(format("Kafka listeners: {listeners}"))
+ kafka_server_config['listeners'] = listeners
if params.security_enabled and params.kafka_kerberos_enabled:
Logger.info("Kafka kerberos security is enabled.")
- if "SASL" not in listeners:
- listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL")
-
- kafka_server_config['listeners'] = listeners
kafka_server_config['advertised.listeners'] = listeners
Logger.info(format("Kafka advertised listeners: {listeners}"))
- else:
- kafka_server_config['listeners'] = listeners
- if 'advertised.listeners' in kafka_server_config:
- advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
- kafka_server_config['advertised.listeners'] = advertised_listeners
- Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
+ elif 'advertised.listeners' in kafka_server_config:
+ advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
+ kafka_server_config['advertised.listeners'] = advertised_listeners
+ Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
else:
kafka_server_config['host.name'] = params.hostname
http://git-wip-us.apache.org/repos/asf/ambari/blob/3fd229c7/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
index 60fa959..7500891 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
@@ -14,7 +14,8 @@
"principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
"super.users": "user:${kafka-env/kafka_user}",
"security.inter.broker.protocol": "PLAINTEXTSASL",
- "zookeeper.set.acl": "true"
+ "zookeeper.set.acl": "true",
+ "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}"
}
}
],
http://git-wip-us.apache.org/repos/asf/ambari/blob/3fd229c7/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
index d0f82af..1365c64 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
@@ -944,7 +944,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
"HIVE": {"hiveserver2-site": self.validateHiveServer2Configurations,
"hive-site": self.validateHiveConfigurations},
"HBASE": {"hbase-site": self.validateHBASEConfigurations},
- "KAKFA": {"kafka-broker": self.validateKAFKAConfigurations},
+ "KAFKA": {"kafka-broker": self.validateKAFKAConfigurations},
"RANGER": {"admin-properties": self.validateRangerAdminConfigurations,
"ranger-env": self.validateRangerConfigurationsEnv}
}
@@ -1108,13 +1108,13 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
def validateKAFKAConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
kafka_broker = properties
validationItems = []
-
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+
#Adding Ranger Plugin logic here
ranger_plugin_properties = getSiteProperties(configurations, "ranger-kafka-plugin-properties")
- ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled']
+ ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled'] if ranger_plugin_properties else 'No'
prop_name = 'authorizer.class.name'
prop_val = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer"
- servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()):
if kafka_broker[prop_name] != prop_val:
validationItems.append({"config-name": prop_name,
@@ -1122,6 +1122,17 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
"If Ranger Kafka Plugin is enabled."\
"{0} needs to be set to {1}".format(prop_name,prop_val))})
+ if self.isSecurityEnabled(services) and 'security.inter.broker.protocol' in properties:
+ interBrokerValue = properties['security.inter.broker.protocol']
+ prop_name = 'listeners'
+ prop_value = properties[prop_name]
+ if interBrokerValue and interBrokerValue not in prop_value:
+ validationItems.append({"config-name": "listeners",
+ "item": self.getWarnItem("If kerberos is enabled "\
+ "{0} need to contain {1} as one of "\
+ "the protocol".format(prop_name, interBrokerValue))})
+
+
return self.toConfigurationValidationProblems(validationItems, "kafka-broker")
def isComponentUsingCardinalityForLayout(self, componentName):
http://git-wip-us.apache.org/repos/asf/ambari/blob/3fd229c7/ambari-server/src/main/resources/stacks/HDP/2.5/services/KAFKA/kerberos.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/services/KAFKA/kerberos.json b/ambari-server/src/main/resources/stacks/HDP/2.5/services/KAFKA/kerberos.json
index 501f969..eb31ad6 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/services/KAFKA/kerberos.json
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/services/KAFKA/kerberos.json
@@ -14,7 +14,8 @@
"principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
"super.users": "user:${kafka-env/kafka_user}",
"security.inter.broker.protocol": "PLAINTEXTSASL",
- "zookeeper.set.acl": "true"
+ "zookeeper.set.acl": "true",
+ "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}"
}
},
{
http://git-wip-us.apache.org/repos/asf/ambari/blob/3fd229c7/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
index 857047b..ece2258 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
@@ -165,6 +165,10 @@ public class VariableReplacementHelperTest {
put("realm", "UNIT.TEST");
}});
+ put("kafka-broker", new HashMap<String, String>() {{
+ put("listeners", "PLAINTEXT://localhost:6667");
+ }});
+
put("clusterHostInfo", new HashMap<String, String>() {{
put("hive_metastore_host", "host1.unit.test, host2.unit.test , host3.unit.test"); // spaces are there on purpose.
}});
@@ -221,6 +225,8 @@ public class VariableReplacementHelperTest {
}
Assert.assertEquals("test=unit.test", helper.replaceVariables("test=${realm|toLower()}", configurations));
+
+ Assert.assertEquals("PLAINTEXTSASL://localhost:6667", helper.replaceVariables("${kafka-broker/listeners|replace(\\bPLAINTEXT\\b,PLAINTEXTSASL)}", configurations));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/3fd229c7/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
index 8baf496..17411d7 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
@@ -591,6 +591,7 @@ public class UpgradeCatalog240Test {
Method updateRecoveryConfigurationDML = UpgradeCatalog240.class.getDeclaredMethod("updateRecoveryConfigurationDML");
Method removeAtlasMetaserverAlert = UpgradeCatalog240.class.getDeclaredMethod("removeAtlasMetaserverAlert");
Method updateRangerHbasePluginProperties = UpgradeCatalog240.class.getDeclaredMethod("updateRangerHbasePluginProperties");
+ Method updateKAFKAConfigs = UpgradeCatalog240.class.getDeclaredMethod("updateKAFKAConfigs");
Capture<String> capturedStatements = newCapture(CaptureType.ALL);
@@ -640,6 +641,7 @@ public class UpgradeCatalog240Test {
.addMockedMethod(updateRecoveryConfigurationDML)
.addMockedMethod(removeAtlasMetaserverAlert)
.addMockedMethod(updateRangerHbasePluginProperties)
+ .addMockedMethod(updateKAFKAConfigs)
.createMock();
Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor");
@@ -681,6 +683,7 @@ public class UpgradeCatalog240Test {
upgradeCatalog240.removeAtlasMetaserverAlert();
upgradeCatalog240.updateRangerHbasePluginProperties();
upgradeCatalog240.adjustHiveJobTimestamps();
+ upgradeCatalog240.updateKAFKAConfigs();
replay(upgradeCatalog240, dbAccessor);
@@ -1173,6 +1176,66 @@ public class UpgradeCatalog240Test {
}
@Test
+ public void testUpdateKAFKAConfigs() throws Exception{
+ EasyMockSupport easyMockSupport = new EasyMockSupport();
+ final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
+ final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
+ final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
+
+ final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class);
+ expect(kafkaBroker.getProperties()).andReturn(new HashMap<String, String>(){{
+ put("listeners", "PLAINTEXT://localhost:6667,SSL://localhost:6666");
+ }}
+ ).anyTimes();
+
+ final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
+ bind(Clusters.class).toInstance(mockClusters);
+ bind(EntityManager.class).toInstance(entityManager);
+ bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+ bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+ bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class));
+ bind(HookContextFactory.class).toInstance(createMock(HookContextFactory.class));
+ bind(HookService.class).toInstance(createMock(HookService.class));
+ }
+ });
+
+ expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once();
+ expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+ put("normal", mockClusterExpected);
+ }}).atLeastOnce();
+ expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce();
+ expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS);
+ expect(mockClusterExpected.getServices()).andReturn(new HashMap<String, Service>() {
+ {
+ put("KAFKA", null);
+ }
+ }).atLeastOnce();
+
+ UpgradeCatalog240 upgradeCatalog240 = createMockBuilder(UpgradeCatalog240.class)
+ .withConstructor(Injector.class)
+ .withArgs(mockInjector)
+ .addMockedMethod("updateConfigurationProperties", String.class,
+ Map.class, boolean.class, boolean.class)
+ .createMock();
+
+ Map<String, String> expectedUpdates = new HashMap<>();
+ expectedUpdates.put("listeners", "PLAINTEXTSASL://localhost:6667,SSL://localhost:6666");
+
+ upgradeCatalog240.updateConfigurationProperties("kafka-broker", expectedUpdates,
+ true, false);
+ expectLastCall().once();
+
+ easyMockSupport.replayAll();
+ replay(upgradeCatalog240);
+ upgradeCatalog240.updateKAFKAConfigs();
+ easyMockSupport.verifyAll();
+ }
+
+
+ @Test
public void testSparkConfigUpdate() throws Exception{
Map<String, String> oldPropertiesSparkDefaults = new HashMap<String, String>() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/3fd229c7/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
index 4575998..d725ec4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
@@ -20,21 +20,29 @@ package org.apache.ambari.server.upgrade;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.newCapture;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
+import java.util.Collections;
+import java.util.Map;
import javax.persistence.EntityManager;
import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.controller.KerberosHelper;
import org.apache.ambari.server.controller.MaintenanceStateHelper;
import org.apache.ambari.server.orm.DBAccessor;
@@ -42,10 +50,12 @@ import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.SecurityType;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.stack.OsFamily;
import org.easymock.Capture;
import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.easymock.MockType;
import org.junit.After;
@@ -53,8 +63,10 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.springframework.security.crypto.password.PasswordEncoder;
import com.google.gson.Gson;
+import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -163,4 +175,84 @@ public class UpgradeCatalog251Test {
Assert.assertEquals(Integer.valueOf(0), captured.getDefaultValue());
Assert.assertEquals(Short.class, captured.getType());
}
+
+ @Test
+ public void testExecuteDMLUpdates() throws Exception {
+ Method updateKAFKAConfigs = UpgradeCatalog251.class.getDeclaredMethod("updateKAFKAConfigs");
+
+ UpgradeCatalog251 upgradeCatalog251 = createMockBuilder(UpgradeCatalog251.class)
+ .addMockedMethod(updateKAFKAConfigs)
+ .createMock();
+
+ Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor");
+ field.set(upgradeCatalog251, dbAccessor);
+
+ upgradeCatalog251.updateKAFKAConfigs();
+ expectLastCall().once();
+
+ replay(upgradeCatalog251, dbAccessor);
+
+ upgradeCatalog251.executeDMLUpdates();
+
+ verify(upgradeCatalog251, dbAccessor);
+ }
+
+
+ @Test
+ public void testUpdateKAFKAConfigs() throws Exception{
+ EasyMockSupport easyMockSupport = new EasyMockSupport();
+ final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
+ final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
+ final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
+
+ Map<String, String> initialProperties = Collections.singletonMap("listeners", "PLAINTEXT://localhost:6667,SSL://localhost:6666");
+ Map<String, String> expectedUpdates = Collections.singletonMap("listeners", "PLAINTEXTSASL://localhost:6667,SSL://localhost:6666");
+
+ final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class);
+ expect(kafkaBroker.getProperties()).andReturn(initialProperties).times(1);
+ // Re-entrant test
+ expect(kafkaBroker.getProperties()).andReturn(expectedUpdates).times(1);
+
+ final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
+ bind(Clusters.class).toInstance(mockClusters);
+ bind(EntityManager.class).toInstance(entityManager);
+ bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+ bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+ bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class));
+ }
+ });
+
+ expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).atLeastOnce();
+ expect(mockClusters.getClusters()).andReturn(Collections.singletonMap("normal", mockClusterExpected)).atLeastOnce();
+ expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce();
+ expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS).atLeastOnce();
+ expect(mockClusterExpected.getServices()).andReturn(Collections.<String, Service>singletonMap("KAFKA", null)).atLeastOnce();
+
+ UpgradeCatalog251 upgradeCatalog251 = createMockBuilder(UpgradeCatalog251.class)
+ .withConstructor(Injector.class)
+ .withArgs(mockInjector)
+ .addMockedMethod("updateConfigurationProperties", String.class,
+ Map.class, boolean.class, boolean.class)
+ .createMock();
+
+
+ // upgradeCatalog251.updateConfigurationProperties is only expected to execute once since no changes are
+ // expected when the relevant data have been previously changed
+ upgradeCatalog251.updateConfigurationProperties("kafka-broker", expectedUpdates, true, false);
+ expectLastCall().once();
+
+ easyMockSupport.replayAll();
+ replay(upgradeCatalog251);
+
+ // Execute the first time... upgrading to Ambari 2.4.0
+ upgradeCatalog251.updateKAFKAConfigs();
+
+ // Test reentry... upgrading from Ambari 2.4.0
+ upgradeCatalog251.updateKAFKAConfigs();
+
+ easyMockSupport.verifyAll();
+ }
}