You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/21 08:40:52 UTC
[incubator-celeborn] branch main updated: [CELEBORN-445] Add CelebornRackResolver to support rack reoslve (#1366)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f16c7b414 [CELEBORN-445] Add CelebornRackResolver to support rack reoslve (#1366)
f16c7b414 is described below
commit f16c7b414e23fbc4efa6ec6c67a5e9844c4fbdb4
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Tue Mar 21 16:40:46 2023 +0800
[CELEBORN-445] Add CelebornRackResolver to support rack reoslve (#1366)
---
.../common/network/CelebornRackResolver.scala | 81 ++++++++++++++++++++++
.../celeborn/common/util/CelebornHadoopUtils.scala | 37 ++++++++++
.../common/network/CelebornRackResolverSuite.scala | 55 +++++++++++++++
3 files changed, 173 insertions(+)
diff --git a/common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala b/common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala
new file mode 100644
index 000000000..2af1e730d
--- /dev/null
+++ b/common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.base.Strings
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic
+import org.apache.hadoop.net.{CachedDNSToSwitchMapping, DNSToSwitchMapping, NetworkTopology, Node, NodeBase, ScriptBasedMapping}
+import org.apache.hadoop.util.ReflectionUtils
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.CelebornHadoopUtils
+
+class CelebornRackResolver(celebornConf: CelebornConf) extends Logging {
+
+ private val dnsToSwitchMapping: DNSToSwitchMapping = {
+ val conf: Configuration = CelebornHadoopUtils.newConfiguration(celebornConf)
+ val dnsToSwitchMappingClass =
+ conf.getClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ classOf[ScriptBasedMapping],
+ classOf[DNSToSwitchMapping])
+ ReflectionUtils.newInstance(dnsToSwitchMappingClass, conf)
+ .asInstanceOf[DNSToSwitchMapping] match {
+ case c: CachedDNSToSwitchMapping => c
+ case o => new CachedDNSToSwitchMapping(o)
+ }
+ }
+
+ def resolve(hostName: String): String = {
+ coreResolve(Seq(hostName)).head.getNetworkLocation
+ }
+
+ def resolve(hostNames: Seq[String]): Seq[Node] = {
+ coreResolve(hostNames)
+ }
+
+ private def coreResolve(hostNames: Seq[String]): Seq[Node] = {
+ if (hostNames.isEmpty) {
+ return Seq.empty
+ }
+ val nodes = new ArrayBuffer[Node]
+ // dnsToSwitchMapping is thread-safe
+ val rNameList = dnsToSwitchMapping.resolve(hostNames.toList.asJava).asScala
+ if (rNameList == null || rNameList.isEmpty) {
+ hostNames.foreach(nodes += new NodeBase(_, NetworkTopology.DEFAULT_RACK))
+ logInfo(s"Got an error when resolving hostNames. " +
+ s"Falling back to ${NetworkTopology.DEFAULT_RACK} for all")
+ } else {
+ for ((hostName, rName) <- hostNames.zip(rNameList)) {
+ if (Strings.isNullOrEmpty(rName)) {
+ nodes += new NodeBase(hostName, NetworkTopology.DEFAULT_RACK)
+ logDebug(s"Could not resolve $hostName. " +
+ s"Falling back to ${NetworkTopology.DEFAULT_RACK}")
+ } else {
+ nodes += new NodeBase(hostName, rName)
+ }
+ }
+ }
+ nodes.toList
+ }
+}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
new file mode 100644
index 000000000..39b70ff18
--- /dev/null
+++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.util
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.celeborn.common.CelebornConf
+
+object CelebornHadoopUtils {
+ private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = {
+ val hadoopConf = new Configuration()
+ appendSparkHadoopConfigs(conf, hadoopConf)
+ hadoopConf
+ }
+
+ private def appendSparkHadoopConfigs(conf: CelebornConf, hadoopConf: Configuration): Unit = {
+ // Copy any "celeborn.hadoop.foo=bar" celeborn properties into conf as "foo=bar"
+ for ((key, value) <- conf.getAll if key.startsWith("celeborn.hadoop.")) {
+ hadoopConf.set(key.substring("celeborn.hadoop.".length), value)
+ }
+ }
+}
diff --git a/common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala b/common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala
new file mode 100644
index 000000000..268f3b75e
--- /dev/null
+++ b/common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network
+
+import java.io.File
+
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic.{NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY}
+import org.apache.hadoop.net.{Node, TableMapping}
+import org.apache.hadoop.shaded.com.google.common.base.Charsets
+import org.apache.hadoop.shaded.com.google.common.io.Files
+import org.junit.Assert.assertEquals
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+
+class CelebornRackResolverSuite extends CelebornFunSuite {
+
+ test("Test TableMapping") {
+ val hostName1 = "1.2.3.4"
+ val hostName2 = "5.6.7.8"
+ val mapFile: File = File.createTempFile(getClass.getSimpleName + ".testResolve", ".txt")
+ Files.asCharSink(mapFile, Charsets.UTF_8).write(
+ hostName1 + " /rack1\n" + hostName2 + "\t/rack2\n")
+ mapFile.deleteOnExit()
+
+ val conf = new CelebornConf
+ conf.set(
+ "celeborn.hadoop." + NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ classOf[TableMapping].getName)
+ conf.set("celeborn.hadoop." + NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath)
+ val resolver = new CelebornRackResolver(conf)
+
+ val names = Seq(hostName1, hostName2)
+
+ val result: Seq[Node] = resolver.resolve(names)
+ assertEquals(names.size, result.size)
+ assertEquals("/rack1", result(0).getNetworkLocation)
+ assertEquals("/rack2", result(1).getNetworkLocation)
+ }
+}