You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2016/10/05 00:00:05 UTC
[4/8] ambari git commit: AMBARI-18517 : Changes in upgrade path for
Kafka metrics collector hosts config. (avijayan)
AMBARI-18517 : Changes in upgrade path for Kafka metrics collector hosts config. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/06f3b8e9
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/06f3b8e9
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/06f3b8e9
Branch: refs/heads/branch-feature-AMBARI-18456
Commit: 06f3b8e9006ef6ad1533f33baa0c9544547bd244
Parents: 1bf2069
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Tue Oct 4 11:10:03 2016 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Tue Oct 4 11:10:20 2016 -0700
----------------------------------------------------------------------
.../server/upgrade/UpgradeCatalog250.java | 30 +++++++++-
.../server/upgrade/UpgradeCatalog250Test.java | 61 ++++++++++++++++++++
2 files changed, 90 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/06f3b8e9/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java
index 185bd58..091c6d9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java
@@ -45,6 +45,9 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog {
protected static final String HOST_VERSION_TABLE = "host_version";
private static final String AMS_ENV = "ams-env";
+ private static final String KAFKA_BROKER = "kafka-broker";
+ private static final String KAFKA_TIMELINE_METRICS_HOST = "kafka.timeline.metrics.host";
+
/**
* Logger.
*/
@@ -110,6 +113,7 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog {
protected void executeDMLUpdates() throws AmbariException, SQLException {
updateAMSConfigs();
createRoleAuthorizations();
+ updateKafkaConfigs();
}
protected void updateHostVersionTable() throws SQLException {
@@ -181,7 +185,31 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog {
Arrays.asList("AMBARI.ADMINISTRATOR:AMBARI", "CLUSTER.ADMINISTRATOR:CLUSTER"));
addRoleAuthorization("AMBARI.RUN_CUSTOM_COMMAND", "Perform custom administrative actions",
- Collections.singletonList("AMBARI.ADMINISTRATOR:AMBARI"));
+ Collections.singletonList("AMBARI.ADMINISTRATOR:AMBARI"));
+ }
+
+ protected void updateKafkaConfigs() throws AmbariException {
+ AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+ Clusters clusters = ambariManagementController.getClusters();
+
+ if (clusters != null) {
+ Map<String, Cluster> clusterMap = clusters.getClusters();
+
+ if (clusterMap != null && !clusterMap.isEmpty()) {
+ for (final Cluster cluster : clusterMap.values()) {
+
+ Config kafkaBrokerConfig = cluster.getDesiredConfigByType(KAFKA_BROKER);
+ if (kafkaBrokerConfig != null) {
+ Map<String, String> kafkaBrokerProperties = kafkaBrokerConfig.getProperties();
+
+ if (kafkaBrokerProperties != null && kafkaBrokerProperties.containsKey(KAFKA_TIMELINE_METRICS_HOST)) {
+ LOG.info("Removing kafka.timeline.metrics.host from kafka-broker");
+ removeConfigurationPropertiesFromCluster(cluster, KAFKA_BROKER, Collections.singleton("kafka.timeline.metrics.host"));
+ }
+ }
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/06f3b8e9/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java
index 7b6c3ad..9f34bcc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java
@@ -124,10 +124,12 @@ public class UpgradeCatalog250Test {
public void testExecuteDMLUpdates() throws Exception {
Method updateAmsConfigs = UpgradeCatalog250.class.getDeclaredMethod("updateAMSConfigs");
Method createRoleAuthorizations = UpgradeCatalog250.class.getDeclaredMethod("createRoleAuthorizations");
+ Method updateKafkaConfigs = UpgradeCatalog250.class.getDeclaredMethod("updateKafkaConfigs");
UpgradeCatalog250 upgradeCatalog250 = createMockBuilder(UpgradeCatalog250.class)
.addMockedMethod(updateAmsConfigs)
.addMockedMethod(createRoleAuthorizations)
+ .addMockedMethod(updateKafkaConfigs)
.createMock();
upgradeCatalog250.updateAMSConfigs();
@@ -136,6 +138,9 @@ public class UpgradeCatalog250Test {
upgradeCatalog250.createRoleAuthorizations();
expectLastCall().once();
+ upgradeCatalog250.updateKafkaConfigs();
+ expectLastCall().once();
+
replay(upgradeCatalog250);
upgradeCatalog250.executeDMLUpdates();
@@ -291,4 +296,60 @@ public class UpgradeCatalog250Test {
Assert.assertEquals(1, clusterAdministratorAuthorizations.size());
Assert.assertTrue(clusterAdministratorAuthorizations.contains(clusterRunCustomCommandEntity));
}
+
+ @Test
+ public void testKafkaUpdateConfigs() throws Exception{
+
+ Map<String, String> oldProperties = new HashMap<String, String>() {
+ {
+ put("kafka.timeline.metrics.host", "{{metric_collector_host}}");
+ put("kafka.timeline.metrics.port", "{{metric_collector_port}}");
+ }
+ };
+ Map<String, String> newProperties = new HashMap<String, String>() {
+ {
+ put("kafka.timeline.metrics.port", "{{metric_collector_port}}");
+ }
+ };
+ EasyMockSupport easyMockSupport = new EasyMockSupport();
+
+ Clusters clusters = easyMockSupport.createNiceMock(Clusters.class);
+ final Cluster cluster = easyMockSupport.createNiceMock(Cluster.class);
+ Config mockKafkaBroker = easyMockSupport.createNiceMock(Config.class);
+
+ expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+ put("normal", cluster);
+ }}).once();
+ expect(cluster.getDesiredConfigByType("kafka-broker")).andReturn(mockKafkaBroker).atLeastOnce();
+ expect(mockKafkaBroker.getProperties()).andReturn(oldProperties).anyTimes();
+
+ Injector injector = easyMockSupport.createNiceMock(Injector.class);
+ expect(injector.getInstance(Gson.class)).andReturn(null).anyTimes();
+ expect(injector.getInstance(MaintenanceStateHelper.class)).andReturn(null).anyTimes();
+ expect(injector.getInstance(KerberosHelper.class)).andReturn(createNiceMock(KerberosHelper.class)).anyTimes();
+
+ replay(injector, clusters, mockKafkaBroker, cluster);
+
+ AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class)
+ .addMockedMethod("createConfiguration")
+ .addMockedMethod("getClusters", new Class[] { })
+ .addMockedMethod("createConfig")
+ .withConstructor(createNiceMock(ActionManager.class), clusters, injector)
+ .createNiceMock();
+
+ Injector injector2 = easyMockSupport.createNiceMock(Injector.class);
+ Capture<Map> propertiesCapture = EasyMock.newCapture();
+
+ expect(injector2.getInstance(AmbariManagementController.class)).andReturn(controller).anyTimes();
+ expect(controller.getClusters()).andReturn(clusters).anyTimes();
+ expect(controller.createConfig(anyObject(Cluster.class), anyString(), capture(propertiesCapture), anyString(),
+ anyObject(Map.class))).andReturn(createNiceMock(Config.class)).once();
+
+ replay(controller, injector2);
+ new UpgradeCatalog250(injector2).updateKafkaConfigs();
+ easyMockSupport.verifyAll();
+
+ Map<String, String> updatedProperties = propertiesCapture.getValue();
+ assertTrue(Maps.difference(newProperties, updatedProperties).areEqual());
+ }
}