You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2015/10/15 16:33:34 UTC
ambari git commit: AMBARI-13373 hdfs balancer via ambari fails to run
once HA is enabled (dsen)
Repository: ambari
Updated Branches:
refs/heads/trunk 6bcdcd9ac -> 95d85dbc4
AMBARI-13373 hdfs balancer via ambari fails to run once HA is enabled (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/95d85dbc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/95d85dbc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/95d85dbc
Branch: refs/heads/trunk
Commit: 95d85dbc4a890678a9565b649c037db97076fe67
Parents: 6bcdcd9
Author: Dmytro Sen <ds...@apache.org>
Authored: Thu Oct 15 17:33:02 2015 +0300
Committer: Dmytro Sen <ds...@apache.org>
Committed: Thu Oct 15 17:33:02 2015 +0300
----------------------------------------------------------------------
.../server/upgrade/UpgradeCatalog210.java | 37 +++++----------
.../server/upgrade/UpgradeCatalog213.java | 18 ++++++++
.../server/upgrade/UpgradeCatalog213Test.java | 47 ++++++++++++++++++++
3 files changed, 77 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/95d85dbc/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java
index 12a1f44..308e7c9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java
@@ -122,6 +122,7 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
private static final String TOPOLOGY_HOST_REQUEST_TABLE = "topology_host_request";
private static final String TOPOLOGY_HOST_TASK_TABLE = "topology_host_task";
private static final String TOPOLOGY_LOGICAL_TASK_TABLE = "topology_logical_task";
+ private static final String HDFS_SITE_CONFIG = "hdfs-site";
// constants for stack table changes
private static final String STACK_ID_COLUMN_NAME = "stack_id";
@@ -1458,7 +1459,7 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
/***
* Update dfs.namenode.rpc-address set hostname instead of localhost
*/
- if (cluster.getDesiredConfigByType("hdfs-site") != null && !cluster.getHosts("HDFS","NAMENODE").isEmpty()) {
+ if (cluster.getDesiredConfigByType(HDFS_SITE_CONFIG) != null && !cluster.getHosts("HDFS","NAMENODE").isEmpty()) {
URI nameNodeRpc = null;
String hostName = cluster.getHosts("HDFS","NAMENODE").iterator().next();
@@ -1467,33 +1468,19 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
cluster.getDesiredConfigByType("core-site").getProperties().get("fs.defaultFS") != null) {
try {
if (isNNHAEnabled(cluster)) {
- String nn1RpcAddress = null;
- Config hdfsSiteConfig = cluster.getDesiredConfigByType("hdfs-site");
- Map<String, String> properties = hdfsSiteConfig.getProperties();
- String nameServices = properties.get("dfs.nameservices");
- if (!StringUtils.isEmpty(nameServices)) {
- String namenodes = properties.get(String.format("dfs.ha.namenodes.%s",nameServices));
- if (!StringUtils.isEmpty(namenodes) && namenodes.split(",").length > 1) {
- nn1RpcAddress = properties.get(String.format("dfs.namenode.rpc-address.%s.%s", nameServices,
- namenodes.split(",")[0]));
- }
- }
- if (!StringUtils.isEmpty(nn1RpcAddress)) {
- if (!nn1RpcAddress.startsWith("http")) {
- nn1RpcAddress = "http://" + nn1RpcAddress;
- }
- nameNodeRpc= new URI(nn1RpcAddress);
- } else {
- // set default value
- nameNodeRpc= new URI("http://localhost:8020");
- }
+ // NN HA enabled
+ // Remove dfs.namenode.rpc-address property
+ Set<String> removePropertiesSet = new HashSet<>();
+ removePropertiesSet.add("dfs.namenode.rpc-address");
+ removeConfigurationPropertiesFromCluster(cluster, HDFS_SITE_CONFIG, removePropertiesSet);
} else {
+ // NN HA disabled
nameNodeRpc = new URI(cluster.getDesiredConfigByType("core-site").getProperties().get("fs.defaultFS"));
+ Map<String, String> hdfsProp = new HashMap<String, String>();
+ hdfsProp.put("dfs.namenode.rpc-address", hostName + ":" + nameNodeRpc.getPort());
+ updateConfigurationPropertiesForCluster(cluster, HDFS_SITE_CONFIG,
+ hdfsProp, false, false);
}
- Map<String, String> hdfsProp = new HashMap<String, String>();
- hdfsProp.put("dfs.namenode.rpc-address", hostName + ":" + nameNodeRpc.getPort());
- updateConfigurationPropertiesForCluster(cluster, "hdfs-site",
- hdfsProp, true, false);
} catch (URISyntaxException e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/95d85dbc/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
index 3eeb6b9..e0ae7c1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -49,6 +50,7 @@ import java.util.UUID;
public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
private static final String STORM_SITE = "storm-site";
+ private static final String HDFS_SITE_CONFIG = "hdfs-site";
private static final String KAFKA_BROKER = "kafka-broker";
private static final String AMS_ENV = "ams-env";
private static final String AMS_HBASE_ENV = "ams-hbase-env";
@@ -119,6 +121,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
updateAlertDefinitions();
updateStormConfigs();
updateAMSConfigs();
+ updateHDFSConfigs();
updateHbaseEnvConfig();
updateKafkaConfigs();
}
@@ -192,6 +195,21 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
return rootJson.toString();
}
+ protected void updateHDFSConfigs() throws AmbariException {
+ AmbariManagementController ambariManagementController = injector.getInstance(
+ AmbariManagementController.class);
+ Map<String, Cluster> clusterMap = getCheckedClusterMap(ambariManagementController.getClusters());
+
+ for (final Cluster cluster : clusterMap.values()) {
+ // Remove dfs.namenode.rpc-address property when NN HA is enabled
+ if (cluster.getDesiredConfigByType(HDFS_SITE_CONFIG) != null && isNNHAEnabled(cluster)) {
+ Set<String> removePropertiesSet = new HashSet<>();
+ removePropertiesSet.add("dfs.namenode.rpc-address");
+ removeConfigurationPropertiesFromCluster(cluster, HDFS_SITE_CONFIG, removePropertiesSet);
+ }
+ }
+ }
+
protected void updateStormConfigs() throws AmbariException {
AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
Clusters clusters = ambariManagementController.getClusters();
http://git-wip-us.apache.org/repos/asf/ambari/blob/95d85dbc/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
index c945186..15234db 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
@@ -50,6 +50,7 @@ import javax.persistence.EntityManager;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import static org.easymock.EasyMock.createMockBuilder;
@@ -95,6 +96,7 @@ public class UpgradeCatalog213Test {
@Test
public void testExecuteDMLUpdates() throws Exception {
Method updateAMSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateAMSConfigs");
+ Method updateHDFSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateHDFSConfigs");
Method updateKafkaConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateKafkaConfigs");
Method updateStormConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateStormConfigs");
Method addNewConfigurationsFromXml = AbstractUpgradeCatalog.class.getDeclaredMethod("addNewConfigurationsFromXml");
@@ -103,6 +105,7 @@ public class UpgradeCatalog213Test {
UpgradeCatalog213 upgradeCatalog213 = createMockBuilder(UpgradeCatalog213.class)
.addMockedMethod(updateAMSConfigs)
+ .addMockedMethod(updateHDFSConfigs)
.addMockedMethod(updateStormConfigs)
.addMockedMethod(addNewConfigurationsFromXml)
.addMockedMethod(updateHbaseEnvConfig)
@@ -122,6 +125,8 @@ public class UpgradeCatalog213Test {
expectLastCall().once();
upgradeCatalog213.updateKafkaConfigs();
expectLastCall().once();
+ upgradeCatalog213.updateHDFSConfigs();
+ expectLastCall().once();
replay(upgradeCatalog213);
@@ -218,6 +223,48 @@ public class UpgradeCatalog213Test {
}
@Test
+ public void testUpdateHDFSConfiguration() throws Exception {
+ EasyMockSupport easyMockSupport = new EasyMockSupport();
+ final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class);
+ final ConfigHelper mockConfigHelper = easyMockSupport.createMock(ConfigHelper.class);
+
+ final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class);
+ final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class);
+
+ final Config mockHdfsSite = easyMockSupport.createNiceMock(Config.class);
+
+ final Map<String, String> propertiesExpectedHdfs = new HashMap<String, String>();
+ propertiesExpectedHdfs.put("dfs.namenode.rpc-address", "nn.rpc.address");
+ propertiesExpectedHdfs.put("dfs.nameservices", "nn1");
+ propertiesExpectedHdfs.put("dfs.ha.namenodes.nn1", "value");
+
+ final Injector mockInjector = Guice.createInjector(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(AmbariManagementController.class).toInstance(mockAmbariManagementController);
+ bind(ConfigHelper.class).toInstance(mockConfigHelper);
+ bind(Clusters.class).toInstance(mockClusters);
+
+ bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
+ bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+ }
+ });
+
+ expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once();
+ expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+ put("normal", mockClusterExpected);
+ }}).once();
+
+ // Expected operation
+ expect(mockClusterExpected.getDesiredConfigByType("hdfs-site")).andReturn(mockHdfsSite).atLeastOnce();
+ expect(mockHdfsSite.getProperties()).andReturn(propertiesExpectedHdfs).anyTimes();
+
+ easyMockSupport.replayAll();
+ mockInjector.getInstance(UpgradeCatalog213.class).updateHDFSConfigs();
+ easyMockSupport.verifyAll();
+ }
+
+ @Test
public void testUpdateAmsHbaseEnvContent() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method updateAmsHbaseEnvContent = UpgradeCatalog213.class.getDeclaredMethod("updateAmsHbaseEnvContent", String.class);
UpgradeCatalog213 upgradeCatalog213 = new UpgradeCatalog213(injector);