You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2017/05/17 20:51:58 UTC

[24/25] ambari git commit: AMBARI-21043. Backport Ambari-17694 - Kafka listeners property does not show SASL_PLAINTEXT protocol when Kerberos is enabled (rlevas)

AMBARI-21043. Backport Ambari-17694 - Kafka listeners property does not show SASL_PLAINTEXT protocol when Kerberos is enabled (rlevas)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/bba703bc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/bba703bc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/bba703bc

Branch: refs/heads/branch-feature-AMBARI-14714
Commit: bba703bc600555e596c49aef8749df65c9ac918c
Parents: 8bf136a
Author: Robert Levas <rl...@hortonworks.com>
Authored: Wed May 17 14:33:03 2017 -0400
Committer: Robert Levas <rl...@hortonworks.com>
Committed: Wed May 17 14:33:03 2017 -0400

----------------------------------------------------------------------
 .../server/upgrade/UpgradeCatalog251.java       | 47 +++++++++-
 .../stacks/HDP/2.3/services/stack_advisor.py    |  2 +-
 .../server/upgrade/UpgradeCatalog251Test.java   | 92 ++++++++++++++++++++
 3 files changed, 139 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/bba703bc/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
index 6f8f2a6..745890c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,9 +18,18 @@
 package org.apache.ambari.server.upgrade;
 
 import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.SecurityType;
+import org.apache.commons.lang.StringUtils;
 
 import com.google.inject.Inject;
 import com.google.inject.Injector;
@@ -33,6 +42,8 @@ public class UpgradeCatalog251 extends AbstractUpgradeCatalog {
   static final String HOST_ROLE_COMMAND_TABLE = "host_role_command";
   static final String HRC_IS_BACKGROUND_COLUMN = "is_background";
 
+  protected static final String KAFKA_BROKER_CONFIG = "kafka-broker";
+
   /**
    * Constructor.
    *
@@ -79,6 +90,40 @@ public class UpgradeCatalog251 extends AbstractUpgradeCatalog {
    */
   @Override
   protected void executeDMLUpdates() throws AmbariException, SQLException {
+    updateKAFKAConfigs();
+  }
+
+  /**
+   * Ensure that the updates from Ambari 2.4.0 are applied in the event the initial version is
+   * Ambari 2.5.0, since this Kafka change failed to make it into Ambari 2.5.0.
+   *
+   * If the base version was before Ambari 2.5.0, this method should wind up doing nothing.
+   * @throws AmbariException
+   */
+  protected void updateKAFKAConfigs() throws AmbariException {
+    AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+    Clusters clusters = ambariManagementController.getClusters();
+    if (clusters != null) {
+      Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
+      if (clusterMap != null && !clusterMap.isEmpty()) {
+        for (final Cluster cluster : clusterMap.values()) {
+          Set<String> installedServices = cluster.getServices().keySet();
+
+          if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS) {
+            Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG);
+            if (kafkaBroker != null) {
+              String listenersPropertyValue = kafkaBroker.getProperties().get("listeners");
+              if (StringUtils.isNotEmpty(listenersPropertyValue)) {
+                String newListenersPropertyValue = listenersPropertyValue.replaceAll("\\bPLAINTEXT\\b", "PLAINTEXTSASL");
+                if(!newListenersPropertyValue.equals(listenersPropertyValue)) {
+                  updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false);
+                }
+              }
+            }
+          }
+        }
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/bba703bc/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
index 8cefdac..9efcee0 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
@@ -918,7 +918,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
       "HIVE": {"hiveserver2-site": self.validateHiveServer2Configurations,
                "hive-site": self.validateHiveConfigurations},
       "HBASE": {"hbase-site": self.validateHBASEConfigurations},
-      "KAKFA": {"kafka-broker": self.validateKAFKAConfigurations},
+      "KAFKA": {"kafka-broker": self.validateKAFKAConfigurations},
       "RANGER": {"admin-properties": self.validateRangerAdminConfigurations,
                  "ranger-env": self.validateRangerConfigurationsEnv}
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/bba703bc/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
index 4575998..d725ec4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog251Test.java
@@ -20,21 +20,29 @@ package org.apache.ambari.server.upgrade;
 
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.newCapture;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.Statement;
+import java.util.Collections;
+import java.util.Map;
 
 import javax.persistence.EntityManager;
 
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.KerberosHelper;
 import org.apache.ambari.server.controller.MaintenanceStateHelper;
 import org.apache.ambari.server.orm.DBAccessor;
@@ -42,10 +50,12 @@ import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.stack.OsFamily;
 import org.easymock.Capture;
 import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
 import org.easymock.Mock;
 import org.easymock.MockType;
 import org.junit.After;
@@ -53,8 +63,10 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.springframework.security.crypto.password.PasswordEncoder;
 
 import com.google.gson.Gson;
+import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -163,4 +175,84 @@ public class UpgradeCatalog251Test {
     Assert.assertEquals(Integer.valueOf(0), captured.getDefaultValue());
     Assert.assertEquals(Short.class, captured.getType());
   }
+
+  @Test
+  public void testExecuteDMLUpdates() throws Exception {
+    Method updateKAFKAConfigs = UpgradeCatalog251.class.getDeclaredMethod("updateKAFKAConfigs");
+
+    UpgradeCatalog251 upgradeCatalog251 = createMockBuilder(UpgradeCatalog251.class)
+        .addMockedMethod(updateKAFKAConfigs)
+        .createMock();
+
+    Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor");
+    field.set(upgradeCatalog251, dbAccessor);
+
+    upgradeCatalog251.updateKAFKAConfigs();
+    expectLastCall().once();
+
+    replay(upgradeCatalog251, dbAccessor);
+
+    upgradeCatalog251.executeDMLUpdates();
+
+    verify(upgradeCatalog251, dbAccessor);
+  }
+
+
+  @Test
+  public void testUpdateKAFKAConfigs() throws Exception{
+    EasyMockSupport easyMockSupport = new EasyMockSupport();
+    final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
+    final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
+    final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
+
+    Map<String, String> initialProperties = Collections.singletonMap("listeners", "PLAINTEXT://localhost:6667,SSL://localhost:6666");
+    Map<String, String> expectedUpdates = Collections.singletonMap("listeners", "PLAINTEXTSASL://localhost:6667,SSL://localhost:6666");
+
+    final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class);
+    expect(kafkaBroker.getProperties()).andReturn(initialProperties).times(1);
+    // Re-entrant test
+    expect(kafkaBroker.getProperties()).andReturn(expectedUpdates).times(1);
+
+    final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
+        bind(Clusters.class).toInstance(mockClusters);
+        bind(EntityManager.class).toInstance(entityManager);
+        bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+        bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class));
+      }
+    });
+
+    expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).atLeastOnce();
+    expect(mockClusters.getClusters()).andReturn(Collections.singletonMap("normal", mockClusterExpected)).atLeastOnce();
+    expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce();
+    expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS).atLeastOnce();
+    expect(mockClusterExpected.getServices()).andReturn(Collections.<String, Service>singletonMap("KAFKA", null)).atLeastOnce();
+
+    UpgradeCatalog251 upgradeCatalog251 = createMockBuilder(UpgradeCatalog251.class)
+        .withConstructor(Injector.class)
+        .withArgs(mockInjector)
+        .addMockedMethod("updateConfigurationProperties", String.class,
+            Map.class, boolean.class, boolean.class)
+        .createMock();
+
+
+    // upgradeCatalog251.updateConfigurationProperties is only expected to execute once since no changes are
+    // expected when the relevant data have been previously changed
+    upgradeCatalog251.updateConfigurationProperties("kafka-broker", expectedUpdates, true, false);
+    expectLastCall().once();
+
+    easyMockSupport.replayAll();
+    replay(upgradeCatalog251);
+
+    // Execute the first time... upgrading to Ambari 2.4.0
+    upgradeCatalog251.updateKAFKAConfigs();
+
+    // Test reentry... upgrading from Ambari 2.4.0
+    upgradeCatalog251.updateKAFKAConfigs();
+
+    easyMockSupport.verifyAll();
+  }
 }