You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2015/11/11 13:44:49 UTC
ambari git commit: AMBARI-13813 After kerberization,
Kafka brokers fail to start (dsen)
Repository: ambari
Updated Branches:
refs/heads/trunk 7e1cb684d -> 9c5e8844a
AMBARI-13813 After kerberization, Kafka brokers fail to start (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9c5e8844
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9c5e8844
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9c5e8844
Branch: refs/heads/trunk
Commit: 9c5e8844ae68a6ce07a5260182fe41243e3bad38
Parents: 7e1cb68
Author: Dmytro Sen <ds...@apache.org>
Authored: Wed Nov 11 14:44:36 2015 +0200
Committer: Dmytro Sen <ds...@apache.org>
Committed: Wed Nov 11 14:44:36 2015 +0200
----------------------------------------------------------------------
.../server/upgrade/UpgradeCatalog213.java | 18 +++++++++++++-
.../server/upgrade/UpgradeCatalog213Test.java | 26 ++++++++++++++++++++
2 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/9c5e8844/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
index 6a40be6..f78188c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
@@ -84,6 +84,8 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
private static final String STORM_SITE = "storm-site";
private static final String HDFS_SITE_CONFIG = "hdfs-site";
private static final String KAFKA_BROKER = "kafka-broker";
+ private static final String KAFKA_ENV_CONFIG = "kafka-env";
+ private static final String KAFKA_ENV_CONTENT_KERBEROS_PARAMS = "export KAFKA_KERBEROS_PARAMS={{kafka_kerberos_params}}";
private static final String AMS_ENV = "ams-env";
private static final String AMS_HBASE_ENV = "ams-hbase-env";
private static final String AMS_SITE = "ams-site";
@@ -921,7 +923,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
Map<String, Cluster> clusterMap = clusters.getClusters();
if (clusterMap != null && !clusterMap.isEmpty()) {
for (final Cluster cluster : clusterMap.values()) {
- Set<String> installedServices =cluster.getServices().keySet();
+ Set<String> installedServices = cluster.getServices().keySet();
Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER);
if (kafkaBroker != null) {
Map<String, String> newProperties = new HashMap<>();
@@ -943,6 +945,20 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
updateConfigurationPropertiesForCluster(cluster, KAFKA_BROKER, newProperties, true, true);
}
}
+
+ StackId stackId = cluster.getCurrentStackVersion();
+ if (stackId != null && stackId.getStackName().equals("HDP") &&
+ VersionUtils.compareVersions(stackId.getStackVersion(), "2.3") >= 0) {
+ Config kafkaEnv = cluster.getDesiredConfigByType(KAFKA_ENV_CONFIG);
+ if (kafkaEnv != null) {
+ String kafkaEnvContent = kafkaEnv.getProperties().get(CONTENT_PROPERTY);
+ if (kafkaEnvContent != null && !kafkaEnvContent.contains(KAFKA_ENV_CONTENT_KERBEROS_PARAMS)) {
+ kafkaEnvContent += "\n\nexport KAFKA_KERBEROS_PARAMS=\"$KAFKA_KERBEROS_PARAMS {{kafka_kerberos_params}}\"";
+ Map<String, String> updates = Collections.singletonMap(CONTENT_PROPERTY, kafkaEnvContent);
+ updateConfigurationPropertiesForCluster(cluster, KAFKA_ENV_CONFIG, updates, true, false);
+ }
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/9c5e8844/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
index aa55597..61cc5ce 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
@@ -761,10 +761,19 @@ public class UpgradeCatalog213Test {
public void testUpdateKafkaConfigs() throws Exception {
EasyMockSupport easyMockSupport = new EasyMockSupport();
final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
+ final ConfigurationResponse mockConfigurationResponse = easyMockSupport.createMock(ConfigurationResponse.class);
final ConfigHelper mockConfigHelper = easyMockSupport.createMock(ConfigHelper.class);
final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
+
+ final Map<String, String> propertiesKafkaEnv = new HashMap<String, String>() {
+ {
+ put("content", "test");
+ }
+ };
+ Map<String, String> updates = Collections.singletonMap("content", "test\n\nexport KAFKA_KERBEROS_PARAMS=\"$KAFKA_KERBEROS_PARAMS {{kafka_kerberos_params}}");
+
final Map<String, String> propertiesAmsEnv = new HashMap<String, String>() {
{
put("kafka.metrics.reporters", "{{kafka_metrics_reporters}}");
@@ -778,6 +787,7 @@ public class UpgradeCatalog213Test {
};
final Config mockAmsEnv = easyMockSupport.createNiceMock(Config.class);
+ final Config mockKafkaEnv = easyMockSupport.createNiceMock(Config.class);
final Injector mockInjector = Guice.createInjector(new AbstractModule() {
@Override
@@ -801,6 +811,22 @@ public class UpgradeCatalog213Test {
expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(mockAmsEnv).atLeastOnce();
expect(mockAmsEnv.getProperties()).andReturn(propertiesAmsEnv).atLeastOnce();
+ expect(mockClusterExpected.getCurrentStackVersion()).andReturn(new StackId("HDP", "2.3"));
+ expect(mockClusterExpected.getDesiredConfigByType("kafka-env")).andReturn(mockKafkaEnv).atLeastOnce();
+ expect(mockKafkaEnv.getProperties()).andReturn(propertiesKafkaEnv).atLeastOnce();
+
+ UpgradeCatalog213 upgradeCatalog213 = createMockBuilder(UpgradeCatalog213.class)
+ .withConstructor(Injector.class)
+ .withArgs(mockInjector)
+ .addMockedMethod("updateConfigurationPropertiesForCluster", Cluster.class, String.class,
+ Map.class, boolean.class, boolean.class)
+ .createMock();
+ upgradeCatalog213.updateConfigurationPropertiesForCluster(mockClusterExpected,
+ "kafka-env", updates, true, false);
+ expectLastCall().once();
+
+ expect(mockAmbariManagementController.createConfiguration(EasyMock.<ConfigurationRequest>anyObject())).andReturn(mockConfigurationResponse);
+
easyMockSupport.replayAll();
mockInjector.getInstance(UpgradeCatalog213.class).updateKafkaConfigs();
easyMockSupport.verifyAll();