You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2016/10/05 00:00:05 UTC

[4/8] ambari git commit: AMBARI-18517 : Changes in upgrade path for Kafka metrics collector hosts config. (avijayan)

AMBARI-18517 : Changes in upgrade path for Kafka metrics collector hosts config. (avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/06f3b8e9
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/06f3b8e9
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/06f3b8e9

Branch: refs/heads/branch-feature-AMBARI-18456
Commit: 06f3b8e9006ef6ad1533f33baa0c9544547bd244
Parents: 1bf2069
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Tue Oct 4 11:10:03 2016 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Tue Oct 4 11:10:20 2016 -0700

----------------------------------------------------------------------
 .../server/upgrade/UpgradeCatalog250.java       | 30 +++++++++-
 .../server/upgrade/UpgradeCatalog250Test.java   | 61 ++++++++++++++++++++
 2 files changed, 90 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/06f3b8e9/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java
index 185bd58..091c6d9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog250.java
@@ -45,6 +45,9 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog {
 
   protected static final String HOST_VERSION_TABLE = "host_version";
   private static final String AMS_ENV = "ams-env";
+  private static final String KAFKA_BROKER = "kafka-broker";
+  private static final String KAFKA_TIMELINE_METRICS_HOST = "kafka.timeline.metrics.host";
+
   /**
    * Logger.
    */
@@ -110,6 +113,7 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog {
   protected void executeDMLUpdates() throws AmbariException, SQLException {
     updateAMSConfigs();
     createRoleAuthorizations();
+    updateKafkaConfigs();
   }
 
   protected void updateHostVersionTable() throws SQLException {
@@ -181,7 +185,31 @@ public class UpgradeCatalog250 extends AbstractUpgradeCatalog {
         Arrays.asList("AMBARI.ADMINISTRATOR:AMBARI", "CLUSTER.ADMINISTRATOR:CLUSTER"));
 
     addRoleAuthorization("AMBARI.RUN_CUSTOM_COMMAND", "Perform custom administrative actions",
-        Collections.singletonList("AMBARI.ADMINISTRATOR:AMBARI"));
+      Collections.singletonList("AMBARI.ADMINISTRATOR:AMBARI"));
+  }
+
+  protected void updateKafkaConfigs() throws AmbariException {
+    AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+    Clusters clusters = ambariManagementController.getClusters();
+
+    if (clusters != null) {
+      Map<String, Cluster> clusterMap = clusters.getClusters();
+
+      if (clusterMap != null && !clusterMap.isEmpty()) {
+        for (final Cluster cluster : clusterMap.values()) {
+
+          Config kafkaBrokerConfig = cluster.getDesiredConfigByType(KAFKA_BROKER);
+          if (kafkaBrokerConfig != null) {
+            Map<String, String> kafkaBrokerProperties = kafkaBrokerConfig.getProperties();
+
+            if (kafkaBrokerProperties != null && kafkaBrokerProperties.containsKey(KAFKA_TIMELINE_METRICS_HOST)) {
+              LOG.info("Removing kafka.timeline.metrics.host from kafka-broker");
+              removeConfigurationPropertiesFromCluster(cluster, KAFKA_BROKER, Collections.singleton("kafka.timeline.metrics.host"));
+            }
+          }
+        }
+      }
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/06f3b8e9/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java
index 7b6c3ad..9f34bcc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog250Test.java
@@ -124,10 +124,12 @@ public class UpgradeCatalog250Test {
   public void testExecuteDMLUpdates() throws Exception {
     Method updateAmsConfigs = UpgradeCatalog250.class.getDeclaredMethod("updateAMSConfigs");
     Method createRoleAuthorizations = UpgradeCatalog250.class.getDeclaredMethod("createRoleAuthorizations");
+    Method updateKafkaConfigs = UpgradeCatalog250.class.getDeclaredMethod("updateKafkaConfigs");
 
     UpgradeCatalog250 upgradeCatalog250 = createMockBuilder(UpgradeCatalog250.class)
         .addMockedMethod(updateAmsConfigs)
         .addMockedMethod(createRoleAuthorizations)
+        .addMockedMethod(updateKafkaConfigs)
         .createMock();
 
     upgradeCatalog250.updateAMSConfigs();
@@ -136,6 +138,9 @@ public class UpgradeCatalog250Test {
     upgradeCatalog250.createRoleAuthorizations();
     expectLastCall().once();
 
+    upgradeCatalog250.updateKafkaConfigs();
+    expectLastCall().once();
+
     replay(upgradeCatalog250);
 
     upgradeCatalog250.executeDMLUpdates();
@@ -291,4 +296,60 @@ public class UpgradeCatalog250Test {
     Assert.assertEquals(1, clusterAdministratorAuthorizations.size());
     Assert.assertTrue(clusterAdministratorAuthorizations.contains(clusterRunCustomCommandEntity));
   }
+
+  @Test
+  public void testKafkaUpdateConfigs() throws Exception{
+
+    Map<String, String> oldProperties = new HashMap<String, String>() {
+      {
+        put("kafka.timeline.metrics.host", "{{metric_collector_host}}");
+        put("kafka.timeline.metrics.port", "{{metric_collector_port}}");
+      }
+    };
+    Map<String, String> newProperties = new HashMap<String, String>() {
+      {
+        put("kafka.timeline.metrics.port", "{{metric_collector_port}}");
+      }
+    };
+    EasyMockSupport easyMockSupport = new EasyMockSupport();
+
+    Clusters clusters = easyMockSupport.createNiceMock(Clusters.class);
+    final Cluster cluster = easyMockSupport.createNiceMock(Cluster.class);
+    Config mockKafkaBroker = easyMockSupport.createNiceMock(Config.class);
+
+    expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+      put("normal", cluster);
+    }}).once();
+    expect(cluster.getDesiredConfigByType("kafka-broker")).andReturn(mockKafkaBroker).atLeastOnce();
+    expect(mockKafkaBroker.getProperties()).andReturn(oldProperties).anyTimes();
+
+    Injector injector = easyMockSupport.createNiceMock(Injector.class);
+    expect(injector.getInstance(Gson.class)).andReturn(null).anyTimes();
+    expect(injector.getInstance(MaintenanceStateHelper.class)).andReturn(null).anyTimes();
+    expect(injector.getInstance(KerberosHelper.class)).andReturn(createNiceMock(KerberosHelper.class)).anyTimes();
+
+    replay(injector, clusters, mockKafkaBroker, cluster);
+
+    AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class)
+      .addMockedMethod("createConfiguration")
+      .addMockedMethod("getClusters", new Class[] { })
+      .addMockedMethod("createConfig")
+      .withConstructor(createNiceMock(ActionManager.class), clusters, injector)
+      .createNiceMock();
+
+    Injector injector2 = easyMockSupport.createNiceMock(Injector.class);
+    Capture<Map> propertiesCapture = EasyMock.newCapture();
+
+    expect(injector2.getInstance(AmbariManagementController.class)).andReturn(controller).anyTimes();
+    expect(controller.getClusters()).andReturn(clusters).anyTimes();
+    expect(controller.createConfig(anyObject(Cluster.class), anyString(), capture(propertiesCapture), anyString(),
+      anyObject(Map.class))).andReturn(createNiceMock(Config.class)).once();
+
+    replay(controller, injector2);
+    new UpgradeCatalog250(injector2).updateKafkaConfigs();
+    easyMockSupport.verifyAll();
+
+    Map<String, String> updatedProperties = propertiesCapture.getValue();
+    assertTrue(Maps.difference(newProperties, updatedProperties).areEqual());
+  }
 }