You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2015/11/11 13:44:49 UTC

ambari git commit: AMBARI-13813 After kerberization, Kafka brokers fail to start (dsen)

Repository: ambari
Updated Branches:
  refs/heads/trunk 7e1cb684d -> 9c5e8844a


AMBARI-13813 After kerberization, Kafka brokers fail to start (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9c5e8844
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9c5e8844
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9c5e8844

Branch: refs/heads/trunk
Commit: 9c5e8844ae68a6ce07a5260182fe41243e3bad38
Parents: 7e1cb68
Author: Dmytro Sen <ds...@apache.org>
Authored: Wed Nov 11 14:44:36 2015 +0200
Committer: Dmytro Sen <ds...@apache.org>
Committed: Wed Nov 11 14:44:36 2015 +0200

----------------------------------------------------------------------
 .../server/upgrade/UpgradeCatalog213.java       | 18 +++++++++++++-
 .../server/upgrade/UpgradeCatalog213Test.java   | 26 ++++++++++++++++++++
 2 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/9c5e8844/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
index 6a40be6..f78188c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
@@ -84,6 +84,8 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
   private static final String STORM_SITE = "storm-site";
   private static final String HDFS_SITE_CONFIG = "hdfs-site";
   private static final String KAFKA_BROKER = "kafka-broker";
+  private static final String KAFKA_ENV_CONFIG = "kafka-env";
+  private static final String KAFKA_ENV_CONTENT_KERBEROS_PARAMS = "export KAFKA_KERBEROS_PARAMS={{kafka_kerberos_params}}";
   private static final String AMS_ENV = "ams-env";
   private static final String AMS_HBASE_ENV = "ams-hbase-env";
   private static final String AMS_SITE = "ams-site";
@@ -921,7 +923,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
       Map<String, Cluster> clusterMap = clusters.getClusters();
       if (clusterMap != null && !clusterMap.isEmpty()) {
         for (final Cluster cluster : clusterMap.values()) {
-          Set<String> installedServices =cluster.getServices().keySet();
+          Set<String> installedServices = cluster.getServices().keySet();
           Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER);
           if (kafkaBroker != null) {
             Map<String, String> newProperties = new HashMap<>();
@@ -943,6 +945,20 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
               updateConfigurationPropertiesForCluster(cluster, KAFKA_BROKER, newProperties, true, true);
             }
           }
+
+          StackId stackId = cluster.getCurrentStackVersion();
+          if (stackId != null && stackId.getStackName().equals("HDP") &&
+              VersionUtils.compareVersions(stackId.getStackVersion(), "2.3") >= 0) {
+            Config kafkaEnv = cluster.getDesiredConfigByType(KAFKA_ENV_CONFIG);
+            if (kafkaEnv != null) {
+              String kafkaEnvContent = kafkaEnv.getProperties().get(CONTENT_PROPERTY);
+              if (kafkaEnvContent != null && !kafkaEnvContent.contains(KAFKA_ENV_CONTENT_KERBEROS_PARAMS)) {
+                kafkaEnvContent += "\n\nexport KAFKA_KERBEROS_PARAMS=\"$KAFKA_KERBEROS_PARAMS {{kafka_kerberos_params}}\"";
+                Map<String, String> updates = Collections.singletonMap(CONTENT_PROPERTY, kafkaEnvContent);
+                updateConfigurationPropertiesForCluster(cluster, KAFKA_ENV_CONFIG, updates, true, false);
+              }
+            }
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/9c5e8844/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
index aa55597..61cc5ce 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
@@ -761,10 +761,19 @@ public class UpgradeCatalog213Test {
   public void testUpdateKafkaConfigs() throws Exception {
     EasyMockSupport easyMockSupport = new EasyMockSupport();
     final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
+    final ConfigurationResponse mockConfigurationResponse = easyMockSupport.createMock(ConfigurationResponse.class);
     final ConfigHelper mockConfigHelper = easyMockSupport.createMock(ConfigHelper.class);
 
     final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
     final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
+
+    final Map<String, String> propertiesKafkaEnv = new HashMap<String, String>() {
+      {
+        put("content", "test");
+      }
+    };
+    Map<String, String> updates = Collections.singletonMap("content", "test\n\nexport KAFKA_KERBEROS_PARAMS=\"$KAFKA_KERBEROS_PARAMS {{kafka_kerberos_params}}");
+
     final Map<String, String> propertiesAmsEnv = new HashMap<String, String>() {
       {
         put("kafka.metrics.reporters", "{{kafka_metrics_reporters}}");
@@ -778,6 +787,7 @@ public class UpgradeCatalog213Test {
     };
 
     final Config mockAmsEnv = easyMockSupport.createNiceMock(Config.class);
+    final Config mockKafkaEnv = easyMockSupport.createNiceMock(Config.class);
 
     final Injector mockInjector = Guice.createInjector(new AbstractModule() {
       @Override
@@ -801,6 +811,22 @@ public class UpgradeCatalog213Test {
     expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(mockAmsEnv).atLeastOnce();
     expect(mockAmsEnv.getProperties()).andReturn(propertiesAmsEnv).atLeastOnce();
 
+    expect(mockClusterExpected.getCurrentStackVersion()).andReturn(new StackId("HDP", "2.3"));
+    expect(mockClusterExpected.getDesiredConfigByType("kafka-env")).andReturn(mockKafkaEnv).atLeastOnce();
+    expect(mockKafkaEnv.getProperties()).andReturn(propertiesKafkaEnv).atLeastOnce();
+
+    UpgradeCatalog213 upgradeCatalog213 = createMockBuilder(UpgradeCatalog213.class)
+        .withConstructor(Injector.class)
+        .withArgs(mockInjector)
+        .addMockedMethod("updateConfigurationPropertiesForCluster", Cluster.class, String.class,
+            Map.class, boolean.class, boolean.class)
+        .createMock();
+    upgradeCatalog213.updateConfigurationPropertiesForCluster(mockClusterExpected,
+        "kafka-env", updates, true, false);
+    expectLastCall().once();
+
+    expect(mockAmbariManagementController.createConfiguration(EasyMock.<ConfigurationRequest>anyObject())).andReturn(mockConfigurationResponse);
+
     easyMockSupport.replayAll();
     mockInjector.getInstance(UpgradeCatalog213.class).updateKafkaConfigs();
     easyMockSupport.verifyAll();