You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by rl...@apache.org on 2017/05/18 20:47:24 UTC

ambari git commit: Revert "AMBARI-21043. Backport Ambari-17694 - Kafka listeners property does not show SASL_PLAINTEXT protocol when Kerberos is enabled (rlevas)"

Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 eb75a5a1c -> 7766ad748


Revert "AMBARI-21043. Backport Ambari-17694 - Kafka listeners property does not show SASL_PLAINTEXT protocol when Kerberos is enabled (rlevas)"

This reverts commit 3fd229c7f4f18b27513bcebaf40257d28ce2e912.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7766ad74
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7766ad74
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7766ad74

Branch: refs/heads/branch-2.5
Commit: 7766ad748596ce2cf255e2dbaa6b417fd1189810
Parents: eb75a5a
Author: Robert Levas <rl...@hortonworks.com>
Authored: Thu May 18 16:47:10 2017 -0400
Committer: Robert Levas <rl...@hortonworks.com>
Committed: Thu May 18 16:47:10 2017 -0400

----------------------------------------------------------------------
 .../kerberos/VariableReplacementHelper.java     | 46 ++--------
 .../server/upgrade/UpgradeCatalog240.java       | 25 ------
 .../server/upgrade/UpgradeCatalog251.java       | 45 ----------
 .../KAFKA/0.8.1/package/scripts/kafka.py        | 15 ++--
 .../common-services/KAFKA/0.9.0/kerberos.json   |  3 +-
 .../stacks/HDP/2.3/services/stack_advisor.py    | 19 +---
 .../stacks/HDP/2.5/services/KAFKA/kerberos.json |  3 +-
 .../kerberos/VariableReplacementHelperTest.java |  6 --
 .../server/upgrade/UpgradeCatalog240Test.java   | 63 --------------
 .../server/upgrade/UpgradeCatalog251Test.java   | 92 --------------------
 10 files changed, 22 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7766ad74/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
index f463cee..77333b8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java
@@ -18,6 +18,12 @@
 
 package org.apache.ambari.server.state.kerberos;
 
+import com.google.inject.Singleton;
+import org.apache.ambari.server.AmbariException;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -25,13 +31,6 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.ambari.server.AmbariException;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Singleton;
-
 /**
  * Helper class to provide variable replacement services
  */
@@ -57,7 +56,6 @@ public class VariableReplacementHelper {
     {
       put("each", new EachFunction());
       put("toLower", new ToLowerFunction());
-      put("replace", new ReplaceValue());
       put("append", new AppendFunction());
     }
   };
@@ -240,38 +238,6 @@ public class VariableReplacementHelper {
   }
 
   /**
-   * ReplaceValue is a Function implementation that replaces the value in the string
-   * <p/>
-   * This function expects the following arguments (in order) within the args array:
-   * <ol>
-   * <li>regular expression that should be replaced</li>
-   * <li>replacement value for the string</li>
-   * </ol>
-   */
-  private static class ReplaceValue implements Function {
-
-    @Override
-    public String perform(String[] args, String data, Map<String, Map<String, String>> replacementsMap) {
-      if ((args == null) || (args.length != 2)) {
-        throw new IllegalArgumentException("Invalid number of arguments encountered");
-      }
-      if (data != null) {
-        StringBuffer builder = new StringBuffer();
-        String regex = args[0];
-        String replacement = args[1];
-        Pattern pattern = Pattern.compile(regex);
-        Matcher matcher = pattern.matcher(data);
-        while (matcher.find()) {
-          matcher.appendReplacement(builder, replacement);
-        }
-        matcher.appendTail(builder);
-        return builder.toString();
-      }
-      return "";
-    }
-  }
-
-  /**
    * ToLowerFunction is a Function implementation that converts a String to lowercase
    */
   private static class ToLowerFunction implements Function {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7766ad74/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
index 87759d9..cf0c37d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java
@@ -182,7 +182,6 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
   protected static final String EXTENSION_ID_COLUMN = "extension_id";
   protected static final String EXTENSION_LINK_TABLE = "extensionlink";
   protected static final String EXTENSION_LINK_ID_COLUMN = "link_id";
-  protected static final String KAFKA_BROKER_CONFIG = "kafka-broker";
 
   private static final Map<String, Integer> ROLE_ORDER;
   private static final String AMS_HBASE_SITE = "ams-hbase-site";
@@ -382,7 +381,6 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
     addManageUserPersistedDataPermission();
     allowClusterOperatorToManageCredentials();
     updateHDFSConfigs();
-    updateKAFKAConfigs();
     updateHIVEConfigs();
     updateAMSConfigs();
     updateClusterEnv();
@@ -1927,29 +1925,6 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog {
     }
   }
 
-  protected void updateKAFKAConfigs() throws AmbariException {
-    AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
-    Clusters clusters = ambariManagementController.getClusters();
-    if (clusters != null) {
-      Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
-      if (clusterMap != null && !clusterMap.isEmpty()) {
-        for (final Cluster cluster : clusterMap.values()) {
-          Set<String> installedServices = cluster.getServices().keySet();
-
-          if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS) {
-            Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG);
-            if (kafkaBroker != null) {
-              String listenersPropertyValue = kafkaBroker.getProperties().get("listeners");
-              if (StringUtils.isNotEmpty(listenersPropertyValue)) {
-                String newListenersPropertyValue = listenersPropertyValue.replaceAll("\\bPLAINTEXT\\b", "PLAINTEXTSASL");
-                updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false);
-              }
-            }
-          }
-        }
-      }
-    }
-  }
 
   protected void updateHIVEConfigs() throws AmbariException {
     AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/7766ad74/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
index dce60dc..146520f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
@@ -18,22 +18,12 @@
 package org.apache.ambari.server.upgrade;
 
 import java.sql.SQLException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
 
 import com.google.inject.Inject;
 import com.google.inject.Injector;
-
-import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.Config;
-import org.apache.ambari.server.state.SecurityType;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +40,6 @@ public class UpgradeCatalog251 extends AbstractUpgradeCatalog {
   private static final String CLUSTER_HOST_INFO_COLUMN = "cluster_host_info";
   private static final String REQUEST_ID_COLUMN = "request_id";
 
-  protected static final String KAFKA_BROKER_CONFIG = "kafka-broker";
 
   /**
    * Logger.
@@ -104,40 +93,6 @@ public class UpgradeCatalog251 extends AbstractUpgradeCatalog {
    */
   @Override
   protected void executeDMLUpdates() throws AmbariException, SQLException {
-    updateKAFKAConfigs();
-  }
-
-  /**
-   * Ensure that the updates from Ambari 2.4.0 are applied in the event the initial version is
-   * Ambari 2.5.0, since this Kafka change failed to make it into Ambari 2.5.0.
-   *
-   * If the base version was before Ambari 2.5.0, this method should wind up doing nothing.
-   * @throws AmbariException
-   */
-  protected void updateKAFKAConfigs() throws AmbariException {
-    AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
-    Clusters clusters = ambariManagementController.getClusters();
-    if (clusters != null) {
-      Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
-      if (clusterMap != null && !clusterMap.isEmpty()) {
-        for (final Cluster cluster : clusterMap.values()) {
-          Set<String> installedServices = cluster.getServices().keySet();
-
-          if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS) {
-            Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG);
-            if (kafkaBroker != null) {
-              String listenersPropertyValue = kafkaBroker.getProperties().get("listeners");
-              if (StringUtils.isNotEmpty(listenersPropertyValue)) {
-                String newListenersPropertyValue = listenersPropertyValue.replaceAll("\\bPLAINTEXT\\b", "PLAINTEXTSASL");
-                if(!newListenersPropertyValue.equals(listenersPropertyValue)) {
-                  updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false);
-                }
-              }
-            }
-          }
-        }
-      }
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/7766ad74/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
index 680dd32..1327090 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py
@@ -80,16 +80,21 @@ def kafka(upgrade_type=None):
 
        listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
        Logger.info(format("Kafka listeners: {listeners}"))
-       kafka_server_config['listeners'] = listeners       
 
        if params.security_enabled and params.kafka_kerberos_enabled:
          Logger.info("Kafka kerberos security is enabled.")
+         if "SASL" not in listeners:
+           listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL")
+
+         kafka_server_config['listeners'] = listeners
          kafka_server_config['advertised.listeners'] = listeners
          Logger.info(format("Kafka advertised listeners: {listeners}"))
-       elif 'advertised.listeners' in kafka_server_config:
-         advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
-         kafka_server_config['advertised.listeners'] = advertised_listeners
-         Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
+       else:
+         kafka_server_config['listeners'] = listeners
+         if 'advertised.listeners' in kafka_server_config:
+           advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
+           kafka_server_config['advertised.listeners'] = advertised_listeners
+           Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
     else:
       kafka_server_config['host.name'] = params.hostname
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/7766ad74/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
index 7500891..60fa959 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json
@@ -14,8 +14,7 @@
               "principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
               "super.users": "user:${kafka-env/kafka_user}",
               "security.inter.broker.protocol": "PLAINTEXTSASL",
-              "zookeeper.set.acl": "true",
-              "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}"
+              "zookeeper.set.acl": "true"
           }
         }
       ],

http://git-wip-us.apache.org/repos/asf/ambari/blob/7766ad74/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
index 1365c64..d0f82af 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
@@ -944,7 +944,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
       "HIVE": {"hiveserver2-site": self.validateHiveServer2Configurations,
                "hive-site": self.validateHiveConfigurations},
       "HBASE": {"hbase-site": self.validateHBASEConfigurations},
-      "KAFKA": {"kafka-broker": self.validateKAFKAConfigurations},
+      "KAKFA": {"kafka-broker": self.validateKAFKAConfigurations},
       "RANGER": {"admin-properties": self.validateRangerAdminConfigurations,
                  "ranger-env": self.validateRangerConfigurationsEnv}
     }
@@ -1108,13 +1108,13 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
   def validateKAFKAConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
     kafka_broker = properties
     validationItems = []
-    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
- 
+
     #Adding Ranger Plugin logic here
     ranger_plugin_properties = getSiteProperties(configurations, "ranger-kafka-plugin-properties")
-    ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled'] if ranger_plugin_properties else 'No'
+    ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled']
     prop_name = 'authorizer.class.name'
     prop_val = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer"
+    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
     if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()):
       if kafka_broker[prop_name] != prop_val:
         validationItems.append({"config-name": prop_name,
@@ -1122,17 +1122,6 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
                                 "If Ranger Kafka Plugin is enabled."\
                                 "{0} needs to be set to {1}".format(prop_name,prop_val))})
 
-    if self.isSecurityEnabled(services) and 'security.inter.broker.protocol' in properties:
-      interBrokerValue = properties['security.inter.broker.protocol']
-      prop_name = 'listeners'
-      prop_value =  properties[prop_name]
-      if interBrokerValue and interBrokerValue not in prop_value:
-        validationItems.append({"config-name": "listeners",
-                                "item": self.getWarnItem("If kerberos is enabled "\
-                                "{0}  need to contain {1} as one of "\
-                                "the protocol".format(prop_name, interBrokerValue))})
-
-
     return self.toConfigurationValidationProblems(validationItems, "kafka-broker")
 
   def isComponentUsingCardinalityForLayout(self, componentName):

http://git-wip-us.apache.org/repos/asf/ambari/blob/7766ad74/ambari-server/src/main/resources/stacks/HDP/2.5/services/KAFKA/kerberos.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/services/KAFKA/kerberos.json b/ambari-server/src/main/resources/stacks/HDP/2.5/services/KAFKA/kerberos.json
index eb31ad6..501f969 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/services/KAFKA/kerberos.json
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/services/KAFKA/kerberos.json
@@ -14,8 +14,7 @@
               "principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
               "super.users": "user:${kafka-env/kafka_user}",
               "security.inter.broker.protocol": "PLAINTEXTSASL",
-              "zookeeper.set.acl": "true",
-              "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}"
+              "zookeeper.set.acl": "true"
           }
         },
         {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7766ad74/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
index ece2258..857047b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java
@@ -165,10 +165,6 @@ public class VariableReplacementHelperTest {
           put("realm", "UNIT.TEST");
         }});
 
-        put("kafka-broker", new HashMap<String, String>() {{
-          put("listeners", "PLAINTEXT://localhost:6667");
-        }});
-        
         put("clusterHostInfo", new HashMap<String, String>() {{
           put("hive_metastore_host", "host1.unit.test, host2.unit.test , host3.unit.test"); // spaces are there on purpose.
         }});
@@ -225,8 +221,6 @@ public class VariableReplacementHelperTest {
     }
 
     Assert.assertEquals("test=unit.test", helper.replaceVariables("test=${realm|toLower()}", configurations));
-  
-    Assert.assertEquals("PLAINTEXTSASL://localhost:6667", helper.replaceVariables("${kafka-broker/listeners|replace(\\bPLAINTEXT\\b,PLAINTEXTSASL)}", configurations)); 
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/7766ad74/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
index 17411d7..8baf496 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java
@@ -591,7 +591,6 @@ public class UpgradeCatalog240Test {
     Method updateRecoveryConfigurationDML = UpgradeCatalog240.class.getDeclaredMethod("updateRecoveryConfigurationDML");
     Method removeAtlasMetaserverAlert = UpgradeCatalog240.class.getDeclaredMethod("removeAtlasMetaserverAlert");
     Method updateRangerHbasePluginProperties = UpgradeCatalog240.class.getDeclaredMethod("updateRangerHbasePluginProperties");
-    Method updateKAFKAConfigs = UpgradeCatalog240.class.getDeclaredMethod("updateKAFKAConfigs");
 
     Capture<String> capturedStatements = newCapture(CaptureType.ALL);
 
@@ -641,7 +640,6 @@ public class UpgradeCatalog240Test {
             .addMockedMethod(updateRecoveryConfigurationDML)
             .addMockedMethod(removeAtlasMetaserverAlert)
             .addMockedMethod(updateRangerHbasePluginProperties)
-            .addMockedMethod(updateKAFKAConfigs)
             .createMock();
 
     Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor");
@@ -683,7 +681,6 @@ public class UpgradeCatalog240Test {
     upgradeCatalog240.removeAtlasMetaserverAlert();
     upgradeCatalog240.updateRangerHbasePluginProperties();
     upgradeCatalog240.adjustHiveJobTimestamps();
-    upgradeCatalog240.updateKAFKAConfigs();
 
     replay(upgradeCatalog240, dbAccessor);
 
@@ -1176,66 +1173,6 @@ public class UpgradeCatalog240Test {
   }
 
   @Test
-  public void testUpdateKAFKAConfigs() throws Exception{
-    EasyMockSupport easyMockSupport = new EasyMockSupport();
-    final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
-    final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
-    final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
-
-    final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class);
-    expect(kafkaBroker.getProperties()).andReturn(new HashMap<String, String>(){{
-      put("listeners", "PLAINTEXT://localhost:6667,SSL://localhost:6666");
-    }}
-    ).anyTimes();
-
-    final Injector mockInjector = Guice.createInjector(new AbstractModule() {
-      @Override
-      protected void configure() {
-        bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
-        bind(Clusters.class).toInstance(mockClusters);
-        bind(EntityManager.class).toInstance(entityManager);
-        bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
-        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
-        bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class));
-        bind(HookContextFactory.class).toInstance(createMock(HookContextFactory.class));
-        bind(HookService.class).toInstance(createMock(HookService.class));
-      }
-    });
-
-    expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once();
-    expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
-      put("normal", mockClusterExpected);
-    }}).atLeastOnce();
-    expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce();
-    expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS);
-    expect(mockClusterExpected.getServices()).andReturn(new HashMap<String, Service>() {
-      {
-        put("KAFKA", null);
-      }
-    }).atLeastOnce();
-
-    UpgradeCatalog240 upgradeCatalog240 = createMockBuilder(UpgradeCatalog240.class)
-            .withConstructor(Injector.class)
-            .withArgs(mockInjector)
-            .addMockedMethod("updateConfigurationProperties", String.class,
-                    Map.class, boolean.class, boolean.class)
-            .createMock();
-
-    Map<String, String> expectedUpdates = new HashMap<>();
-    expectedUpdates.put("listeners", "PLAINTEXTSASL://localhost:6667,SSL://localhost:6666");
-
-    upgradeCatalog240.updateConfigurationProperties("kafka-broker", expectedUpdates,
-            true, false);
-    expectLastCall().once();
-
-    easyMockSupport.replayAll();
-    replay(upgradeCatalog240);
-    upgradeCatalog240.updateKAFKAConfigs();
-    easyMockSupport.verifyAll();
-  }
-
-
-  @Test
   public void testSparkConfigUpdate() throws Exception{
 
     Map<String, String> oldPropertiesSparkDefaults = new HashMap<String, String>() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/7766ad74/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
index d725ec4..4575998 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
@@ -20,29 +20,21 @@ package org.apache.ambari.server.upgrade;
 
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.createMockBuilder;
-import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.newCapture;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.Statement;
-import java.util.Collections;
-import java.util.Map;
 
 import javax.persistence.EntityManager;
 
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.configuration.Configuration;
-import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.KerberosHelper;
 import org.apache.ambari.server.controller.MaintenanceStateHelper;
 import org.apache.ambari.server.orm.DBAccessor;
@@ -50,12 +42,10 @@ import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
-import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.stack.OsFamily;
 import org.easymock.Capture;
 import org.easymock.EasyMockRunner;
-import org.easymock.EasyMockSupport;
 import org.easymock.Mock;
 import org.easymock.MockType;
 import org.junit.After;
@@ -63,10 +53,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.springframework.security.crypto.password.PasswordEncoder;
 
 import com.google.gson.Gson;
-import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -175,84 +163,4 @@ public class UpgradeCatalog251Test {
     Assert.assertEquals(Integer.valueOf(0), captured.getDefaultValue());
     Assert.assertEquals(Short.class, captured.getType());
   }
-
-  @Test
-  public void testExecuteDMLUpdates() throws Exception {
-    Method updateKAFKAConfigs = UpgradeCatalog251.class.getDeclaredMethod("updateKAFKAConfigs");
-
-    UpgradeCatalog251 upgradeCatalog251 = createMockBuilder(UpgradeCatalog251.class)
-        .addMockedMethod(updateKAFKAConfigs)
-        .createMock();
-
-    Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor");
-    field.set(upgradeCatalog251, dbAccessor);
-
-    upgradeCatalog251.updateKAFKAConfigs();
-    expectLastCall().once();
-
-    replay(upgradeCatalog251, dbAccessor);
-
-    upgradeCatalog251.executeDMLUpdates();
-
-    verify(upgradeCatalog251, dbAccessor);
-  }
-
-
-  @Test
-  public void testUpdateKAFKAConfigs() throws Exception{
-    EasyMockSupport easyMockSupport = new EasyMockSupport();
-    final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
-    final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
-    final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
-
-    Map<String, String> initialProperties = Collections.singletonMap("listeners", "PLAINTEXT://localhost:6667,SSL://localhost:6666");
-    Map<String, String> expectedUpdates = Collections.singletonMap("listeners", "PLAINTEXTSASL://localhost:6667,SSL://localhost:6666");
-
-    final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class);
-    expect(kafkaBroker.getProperties()).andReturn(initialProperties).times(1);
-    // Re-entrant test
-    expect(kafkaBroker.getProperties()).andReturn(expectedUpdates).times(1);
-
-    final Injector mockInjector = Guice.createInjector(new AbstractModule() {
-      @Override
-      protected void configure() {
-        bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
-        bind(Clusters.class).toInstance(mockClusters);
-        bind(EntityManager.class).toInstance(entityManager);
-        bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
-        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
-        bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class));
-      }
-    });
-
-    expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).atLeastOnce();
-    expect(mockClusters.getClusters()).andReturn(Collections.singletonMap("normal", mockClusterExpected)).atLeastOnce();
-    expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce();
-    expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS).atLeastOnce();
-    expect(mockClusterExpected.getServices()).andReturn(Collections.<String, Service>singletonMap("KAFKA", null)).atLeastOnce();
-
-    UpgradeCatalog251 upgradeCatalog251 = createMockBuilder(UpgradeCatalog251.class)
-        .withConstructor(Injector.class)
-        .withArgs(mockInjector)
-        .addMockedMethod("updateConfigurationProperties", String.class,
-            Map.class, boolean.class, boolean.class)
-        .createMock();
-
-
-    // upgradeCatalog251.updateConfigurationProperties is only expected to execute once since no changes are
-    // expected when the relevant data have been previously changed
-    upgradeCatalog251.updateConfigurationProperties("kafka-broker", expectedUpdates, true, false);
-    expectLastCall().once();
-
-    easyMockSupport.replayAll();
-    replay(upgradeCatalog251);
-
-    // Execute the first time... upgrading to Ambari 2.4.0
-    upgradeCatalog251.updateKAFKAConfigs();
-
-    // Test reentry... upgrading from Ambari 2.4.0
-    upgradeCatalog251.updateKAFKAConfigs();
-
-    easyMockSupport.verifyAll();
-  }
 }