You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jl...@apache.org on 2017/10/11 05:25:09 UTC
[13/31] ambari git commit: AMBARI-22159. Replace hostgroup vars for
Druid
AMBARI-22159. Replace hostgroup vars for Druid
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/75465a83
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/75465a83
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/75465a83
Branch: refs/heads/branch-feature-AMBARI-14714
Commit: 75465a83bd743bb3a2fa74acf30cfca4d0a2287c
Parents: f1c4626
Author: Attila Doroszlai <ad...@hortonworks.com>
Authored: Mon Oct 9 14:40:02 2017 +0200
Committer: Attila Doroszlai <ad...@hortonworks.com>
Committed: Mon Oct 9 18:39:22 2017 +0200
----------------------------------------------------------------------
.../BlueprintConfigurationProcessor.java | 81 +++++++++++++++-----
.../BlueprintConfigurationProcessorTest.java | 32 ++++++++
2 files changed, 92 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/75465a83/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
index 5a6e2cc..03f84a5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java
@@ -32,6 +32,7 @@ import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -1367,11 +1368,56 @@ public class BlueprintConfigurationProcessor {
ClusterTopology topology);
}
+ private static class HostGroupUpdater implements PropertyUpdater {
+
+ public static final PropertyUpdater INSTANCE = new HostGroupUpdater();
+
+ @Override
+ public String updateForClusterCreate(String propertyName,
+ String origValue,
+ Map<String, Map<String, String>> properties,
+ ClusterTopology topology) {
+
+ //todo: getHostStrings
+ Matcher m = HostGroup.HOSTGROUP_REGEX.matcher(origValue);
+ if (m.find()) {
+ String hostGroupName = m.group(1);
+
+ HostGroupInfo groupInfo = topology.getHostGroupInfo().get(hostGroupName);
+ if (groupInfo == null) {
+ //todo: this should be validated in configuration validation
+ throw new RuntimeException(
+ "Encountered a host group token in configuration which couldn't be matched to a host group: "
+ + hostGroupName);
+ }
+
+ //todo: warn if > hosts
+ return origValue.replace(m.group(0), groupInfo.getHostNames().iterator().next());
+ }
+
+ return origValue;
+ }
+
+ @Override
+ public Collection<String> getRequiredHostGroups(String propertyName,
+ String origValue,
+ Map<String, Map<String, String>> properties,
+ ClusterTopology topology) {
+ //todo: getHostStrings
+ Matcher m = HostGroup.HOSTGROUP_REGEX.matcher(origValue);
+ if (m.find()) {
+ String hostGroupName = m.group(1);
+ return Collections.singleton(hostGroupName);
+ }
+ return Collections.emptySet();
+ }
+ }
+
/**
* Topology based updater which replaces the original host name of a property with the host name
* which runs the associated (master) component in the new cluster.
*/
- private static class SingleHostTopologyUpdater implements PropertyUpdater {
+ private static class SingleHostTopologyUpdater extends HostGroupUpdater {
/**
* Component name
*/
@@ -1402,21 +1448,9 @@ public class BlueprintConfigurationProcessor {
Map<String, Map<String, String>> properties,
ClusterTopology topology) {
- //todo: getHostStrings
- Matcher m = HostGroup.HOSTGROUP_REGEX.matcher(origValue);
- if (m.find()) {
- String hostGroupName = m.group(1);
-
- HostGroupInfo groupInfo = topology.getHostGroupInfo().get(hostGroupName);
- if (groupInfo == null) {
- //todo: this should be validated in configuration validation
- throw new RuntimeException(
- "Encountered a host group token in configuration which couldn't be matched to a host group: "
- + hostGroupName);
- }
-
- //todo: warn if > hosts
- return origValue.replace(m.group(0), groupInfo.getHostNames().iterator().next());
+ String replacedValue = super.updateForClusterCreate(propertyName, origValue, properties, topology);
+ if (!Objects.equals(origValue, replacedValue)) {
+ return replacedValue;
} else {
int matchingGroupCount = topology.getHostGroupsForComponent(component).size();
if (matchingGroupCount == 1) {
@@ -1525,11 +1559,9 @@ public class BlueprintConfigurationProcessor {
String origValue,
Map<String, Map<String, String>> properties,
ClusterTopology topology) {
- //todo: getHostStrings
- Matcher m = HostGroup.HOSTGROUP_REGEX.matcher(origValue);
- if (m.find()) {
- String hostGroupName = m.group(1);
- return Collections.singleton(hostGroupName);
+ Collection<String> result = super.getRequiredHostGroups(propertyName, origValue, properties, topology);
+ if (!result.isEmpty()) {
+ return result;
} else {
Collection<String> matchingGroups = topology.getHostGroupsForComponent(component);
int matchingGroupCount = matchingGroups.size();
@@ -2351,6 +2383,7 @@ public class BlueprintConfigurationProcessor {
allUpdaters.add(nonTopologyUpdaters);
Map<String, PropertyUpdater> amsSiteMap = new HashMap<>();
+ Map<String, PropertyUpdater> druidCommon = new HashMap<>();
Map<String, PropertyUpdater> hdfsSiteMap = new HashMap<>();
Map<String, PropertyUpdater> mapredSiteMap = new HashMap<>();
Map<String, PropertyUpdater> coreSiteMap = new HashMap<>();
@@ -2404,6 +2437,7 @@ public class BlueprintConfigurationProcessor {
Map<String, PropertyUpdater> zookeeperEnvMap = new HashMap<>();
singleHostTopologyUpdaters.put("ams-site", amsSiteMap);
+ singleHostTopologyUpdaters.put("druid-common", druidCommon);
singleHostTopologyUpdaters.put("hdfs-site", hdfsSiteMap);
singleHostTopologyUpdaters.put("mapred-site", mapredSiteMap);
singleHostTopologyUpdaters.put("core-site", coreSiteMap);
@@ -2775,6 +2809,11 @@ public class BlueprintConfigurationProcessor {
}
}
});
+
+ // DRUID
+ druidCommon.put("metastore_hostname", HostGroupUpdater.INSTANCE);
+ druidCommon.put("druid.metadata.storage.connector.connectURI", HostGroupUpdater.INSTANCE);
+ druidCommon.put("druid.zk.service.host", new MultipleHostTopologyUpdater("ZOOKEEPER_SERVER"));
}
private static void addUnitPropertyUpdaters() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/75465a83/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
index 68d6349..d137f2c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java
@@ -88,6 +88,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/**
* BlueprintConfigurationProcessor unit tests.
@@ -7933,6 +7934,37 @@ public class BlueprintConfigurationProcessorTest extends EasyMockSupport {
assertEquals(someString, metricsReporterRegister);
}
+ @Test
+ public void druidProperties() throws Exception {
+ Map<String, Map<String, String>> properties = new HashMap<>();
+ Map<String, String> druidCommon = new HashMap<>();
+ String connectUriKey = "druid.metadata.storage.connector.connectURI";
+ String metastoreHostnameKey = "metastore_hostname";
+ String connectUriTemplate = "jdbc:mysql://%s:3306/druid?createDatabaseIfNotExist=true";
+ druidCommon.put(connectUriKey, String.format(connectUriTemplate, "%HOSTGROUP::group1%"));
+ druidCommon.put(metastoreHostnameKey, "%HOSTGROUP::group1%");
+ properties.put("druid-common", druidCommon);
+
+ Map<String, Map<String, String>> parentProperties = new HashMap<>();
+ Configuration parentClusterConfig = new Configuration(parentProperties, Collections.<String, Map<String, Map<String, String>>>emptyMap());
+ Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap(), parentClusterConfig);
+
+ Collection<String> hgComponents1 = Sets.newHashSet("DRUID_COORDINATOR");
+ TestHostGroup group1 = new TestHostGroup("group1", hgComponents1, Collections.singleton("host1"));
+
+ Collection<String> hgComponents2 = Sets.newHashSet("DRUID_BROKER", "DRUID_OVERLORD", "DRUID_ROUTER");
+ TestHostGroup group2 = new TestHostGroup("group2", hgComponents2, Collections.singleton("host2"));
+
+ Collection<TestHostGroup> hostGroups = Arrays.asList(group1, group2);
+
+ ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+ BlueprintConfigurationProcessor configProcessor = new BlueprintConfigurationProcessor(topology);
+
+ configProcessor.doUpdateForClusterCreate();
+
+ assertEquals(String.format(connectUriTemplate, "host1"), clusterConfig.getPropertyValue("druid-common", connectUriKey));
+ assertEquals("host1", clusterConfig.getPropertyValue("druid-common", metastoreHostnameKey));
+ }
@Test
public void testAmsPropertiesDefault() throws Exception {