You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2015/10/15 16:33:34 UTC

ambari git commit: AMBARI-13373 hdfs balancer via ambari fails to run once HA is enabled (dsen)

Repository: ambari
Updated Branches:
  refs/heads/trunk 6bcdcd9ac -> 95d85dbc4


AMBARI-13373 hdfs balancer via ambari fails to run once HA is enabled (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/95d85dbc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/95d85dbc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/95d85dbc

Branch: refs/heads/trunk
Commit: 95d85dbc4a890678a9565b649c037db97076fe67
Parents: 6bcdcd9
Author: Dmytro Sen <ds...@apache.org>
Authored: Thu Oct 15 17:33:02 2015 +0300
Committer: Dmytro Sen <ds...@apache.org>
Committed: Thu Oct 15 17:33:02 2015 +0300

----------------------------------------------------------------------
 .../server/upgrade/UpgradeCatalog210.java       | 37 +++++----------
 .../server/upgrade/UpgradeCatalog213.java       | 18 ++++++++
 .../server/upgrade/UpgradeCatalog213Test.java   | 47 ++++++++++++++++++++
 3 files changed, 77 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/95d85dbc/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java
index 12a1f44..308e7c9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java
@@ -122,6 +122,7 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
   private static final String TOPOLOGY_HOST_REQUEST_TABLE = "topology_host_request";
   private static final String TOPOLOGY_HOST_TASK_TABLE = "topology_host_task";
   private static final String TOPOLOGY_LOGICAL_TASK_TABLE = "topology_logical_task";
+  private static final String HDFS_SITE_CONFIG = "hdfs-site";
 
   // constants for stack table changes
   private static final String STACK_ID_COLUMN_NAME = "stack_id";
@@ -1458,7 +1459,7 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
           /***
            * Update dfs.namenode.rpc-address set hostname instead of localhost
            */
-          if (cluster.getDesiredConfigByType("hdfs-site") != null && !cluster.getHosts("HDFS","NAMENODE").isEmpty()) {
+          if (cluster.getDesiredConfigByType(HDFS_SITE_CONFIG) != null && !cluster.getHosts("HDFS","NAMENODE").isEmpty()) {
 
             URI nameNodeRpc = null;
             String hostName = cluster.getHosts("HDFS","NAMENODE").iterator().next();
@@ -1467,33 +1468,19 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
                       cluster.getDesiredConfigByType("core-site").getProperties().get("fs.defaultFS") != null) {
               try {
                 if (isNNHAEnabled(cluster)) {
-                  String nn1RpcAddress = null;
-                  Config hdfsSiteConfig = cluster.getDesiredConfigByType("hdfs-site");
-                  Map<String, String> properties = hdfsSiteConfig.getProperties();
-                  String nameServices = properties.get("dfs.nameservices");
-                  if (!StringUtils.isEmpty(nameServices)) {
-                    String namenodes = properties.get(String.format("dfs.ha.namenodes.%s",nameServices));
-                    if (!StringUtils.isEmpty(namenodes) && namenodes.split(",").length > 1) {
-                      nn1RpcAddress = properties.get(String.format("dfs.namenode.rpc-address.%s.%s", nameServices,
-                              namenodes.split(",")[0]));
-                    }
-                  }
-                  if (!StringUtils.isEmpty(nn1RpcAddress)) {
-                    if (!nn1RpcAddress.startsWith("http")) {
-                      nn1RpcAddress = "http://" + nn1RpcAddress;
-                    }
-                    nameNodeRpc= new URI(nn1RpcAddress);
-                  } else {
-                    // set default value
-                    nameNodeRpc= new URI("http://localhost:8020");
-                  }
+                  // NN HA enabled
+                  // Remove dfs.namenode.rpc-address property
+                  Set<String> removePropertiesSet = new HashSet<>();
+                  removePropertiesSet.add("dfs.namenode.rpc-address");
+                  removeConfigurationPropertiesFromCluster(cluster, HDFS_SITE_CONFIG, removePropertiesSet);
                 } else {
+                  // NN HA disabled
                   nameNodeRpc = new URI(cluster.getDesiredConfigByType("core-site").getProperties().get("fs.defaultFS"));
+                  Map<String, String> hdfsProp = new HashMap<String, String>();
+                  hdfsProp.put("dfs.namenode.rpc-address", hostName + ":" + nameNodeRpc.getPort());
+                  updateConfigurationPropertiesForCluster(cluster, HDFS_SITE_CONFIG,
+                          hdfsProp, false, false);
                 }
-                Map<String, String> hdfsProp = new HashMap<String, String>();
-                hdfsProp.put("dfs.namenode.rpc-address", hostName + ":" + nameNodeRpc.getPort());
-                updateConfigurationPropertiesForCluster(cluster, "hdfs-site",
-                        hdfsProp, true, false);
               } catch (URISyntaxException e) {
                 e.printStackTrace();
               }

http://git-wip-us.apache.org/repos/asf/ambari/blob/95d85dbc/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
index 3eeb6b9..e0ae7c1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -49,6 +50,7 @@ import java.util.UUID;
 public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
 
   private static final String STORM_SITE = "storm-site";
+  private static final String HDFS_SITE_CONFIG = "hdfs-site";
   private static final String KAFKA_BROKER = "kafka-broker";
   private static final String AMS_ENV = "ams-env";
   private static final String AMS_HBASE_ENV = "ams-hbase-env";
@@ -119,6 +121,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
     updateAlertDefinitions();
     updateStormConfigs();
     updateAMSConfigs();
+    updateHDFSConfigs();
     updateHbaseEnvConfig();
     updateKafkaConfigs();
   }
@@ -192,6 +195,21 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
     return rootJson.toString();
   }
 
+  protected void updateHDFSConfigs() throws AmbariException {
+    AmbariManagementController ambariManagementController = injector.getInstance(
+            AmbariManagementController.class);
+    Map<String, Cluster> clusterMap = getCheckedClusterMap(ambariManagementController.getClusters());
+
+    for (final Cluster cluster : clusterMap.values()) {
+      // Remove dfs.namenode.rpc-address property when NN HA is enabled
+      if (cluster.getDesiredConfigByType(HDFS_SITE_CONFIG) != null && isNNHAEnabled(cluster)) {
+        Set<String> removePropertiesSet = new HashSet<>();
+        removePropertiesSet.add("dfs.namenode.rpc-address");
+        removeConfigurationPropertiesFromCluster(cluster, HDFS_SITE_CONFIG, removePropertiesSet);
+      }
+    }
+  }
+
   protected void updateStormConfigs() throws AmbariException {
     AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
     Clusters clusters = ambariManagementController.getClusters();

http://git-wip-us.apache.org/repos/asf/ambari/blob/95d85dbc/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
index c945186..15234db 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
@@ -50,6 +50,7 @@ import javax.persistence.EntityManager;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import static org.easymock.EasyMock.createMockBuilder;
@@ -95,6 +96,7 @@ public class UpgradeCatalog213Test {
   @Test
   public void testExecuteDMLUpdates() throws Exception {
     Method updateAMSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateAMSConfigs");
+    Method updateHDFSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateHDFSConfigs");
     Method updateKafkaConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateKafkaConfigs");
     Method updateStormConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateStormConfigs");
     Method addNewConfigurationsFromXml = AbstractUpgradeCatalog.class.getDeclaredMethod("addNewConfigurationsFromXml");
@@ -103,6 +105,7 @@ public class UpgradeCatalog213Test {
 
     UpgradeCatalog213 upgradeCatalog213 = createMockBuilder(UpgradeCatalog213.class)
         .addMockedMethod(updateAMSConfigs)
+        .addMockedMethod(updateHDFSConfigs)
         .addMockedMethod(updateStormConfigs)
         .addMockedMethod(addNewConfigurationsFromXml)
         .addMockedMethod(updateHbaseEnvConfig)
@@ -122,6 +125,8 @@ public class UpgradeCatalog213Test {
     expectLastCall().once();
     upgradeCatalog213.updateKafkaConfigs();
     expectLastCall().once();
+    upgradeCatalog213.updateHDFSConfigs();
+    expectLastCall().once();
 
     replay(upgradeCatalog213);
 
@@ -218,6 +223,48 @@ public class UpgradeCatalog213Test {
   }
 
   @Test
+  public void testUpdateHDFSConfiguration() throws Exception {
+    EasyMockSupport easyMockSupport = new EasyMockSupport();
+    final AmbariManagementController  mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
+    final ConfigHelper mockConfigHelper = easyMockSupport.createMock(ConfigHelper.class);
+
+    final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
+    final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
+
+    final Config mockHdfsSite = easyMockSupport.createNiceMock(Config.class);
+
+    final Map<String, String> propertiesExpectedHdfs = new HashMap<String, String>();
+    propertiesExpectedHdfs.put("dfs.namenode.rpc-address", "nn.rpc.address");
+    propertiesExpectedHdfs.put("dfs.nameservices", "nn1");
+    propertiesExpectedHdfs.put("dfs.ha.namenodes.nn1", "value");
+
+    final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
+        bind(ConfigHelper.class).toInstance(mockConfigHelper);
+        bind(Clusters.class).toInstance(mockClusters);
+
+        bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+        bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+      }
+    });
+
+    expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once();
+    expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+      put("normal", mockClusterExpected);
+    }}).once();
+
+    // Expected operation
+    expect(mockClusterExpected.getDesiredConfigByType("hdfs-site")).andReturn(mockHdfsSite).atLeastOnce();
+    expect(mockHdfsSite.getProperties()).andReturn(propertiesExpectedHdfs).anyTimes();
+
+    easyMockSupport.replayAll();
+    mockInjector.getInstance(UpgradeCatalog213.class).updateHDFSConfigs();
+    easyMockSupport.verifyAll();
+  }
+
+  @Test
   public void testUpdateAmsHbaseEnvContent() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
     Method updateAmsHbaseEnvContent = UpgradeCatalog213.class.getDeclaredMethod("updateAmsHbaseEnvContent", String.class);
     UpgradeCatalog213 upgradeCatalog213 = new UpgradeCatalog213(injector);