You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/21 08:40:52 UTC

[incubator-celeborn] branch main updated: [CELEBORN-445] Add CelebornRackResolver to support rack reoslve (#1366)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f16c7b414 [CELEBORN-445] Add CelebornRackResolver to support rack reoslve (#1366)
f16c7b414 is described below

commit f16c7b414e23fbc4efa6ec6c67a5e9844c4fbdb4
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Tue Mar 21 16:40:46 2023 +0800

    [CELEBORN-445] Add CelebornRackResolver to support rack reoslve (#1366)
---
 .../common/network/CelebornRackResolver.scala      | 81 ++++++++++++++++++++++
 .../celeborn/common/util/CelebornHadoopUtils.scala | 37 ++++++++++
 .../common/network/CelebornRackResolverSuite.scala | 55 +++++++++++++++
 3 files changed, 173 insertions(+)

diff --git a/common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala b/common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala
new file mode 100644
index 000000000..2af1e730d
--- /dev/null
+++ b/common/src/main/scala/org/apache/celeborn/common/network/CelebornRackResolver.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.base.Strings
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic
+import org.apache.hadoop.net.{CachedDNSToSwitchMapping, DNSToSwitchMapping, NetworkTopology, Node, NodeBase, ScriptBasedMapping}
+import org.apache.hadoop.util.ReflectionUtils
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.util.CelebornHadoopUtils
+
+class CelebornRackResolver(celebornConf: CelebornConf) extends Logging {
+
+  private val dnsToSwitchMapping: DNSToSwitchMapping = {
+    val conf: Configuration = CelebornHadoopUtils.newConfiguration(celebornConf)
+    val dnsToSwitchMappingClass =
+      conf.getClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        classOf[ScriptBasedMapping],
+        classOf[DNSToSwitchMapping])
+    ReflectionUtils.newInstance(dnsToSwitchMappingClass, conf)
+      .asInstanceOf[DNSToSwitchMapping] match {
+      case c: CachedDNSToSwitchMapping => c
+      case o => new CachedDNSToSwitchMapping(o)
+    }
+  }
+
+  def resolve(hostName: String): String = {
+    coreResolve(Seq(hostName)).head.getNetworkLocation
+  }
+
+  def resolve(hostNames: Seq[String]): Seq[Node] = {
+    coreResolve(hostNames)
+  }
+
+  private def coreResolve(hostNames: Seq[String]): Seq[Node] = {
+    if (hostNames.isEmpty) {
+      return Seq.empty
+    }
+    val nodes = new ArrayBuffer[Node]
+    // dnsToSwitchMapping is thread-safe
+    val rNameList = dnsToSwitchMapping.resolve(hostNames.toList.asJava).asScala
+    if (rNameList == null || rNameList.isEmpty) {
+      hostNames.foreach(nodes += new NodeBase(_, NetworkTopology.DEFAULT_RACK))
+      logInfo(s"Got an error when resolving hostNames. " +
+        s"Falling back to ${NetworkTopology.DEFAULT_RACK} for all")
+    } else {
+      for ((hostName, rName) <- hostNames.zip(rNameList)) {
+        if (Strings.isNullOrEmpty(rName)) {
+          nodes += new NodeBase(hostName, NetworkTopology.DEFAULT_RACK)
+          logDebug(s"Could not resolve $hostName. " +
+            s"Falling back to ${NetworkTopology.DEFAULT_RACK}")
+        } else {
+          nodes += new NodeBase(hostName, rName)
+        }
+      }
+    }
+    nodes.toList
+  }
+}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
new file mode 100644
index 000000000..39b70ff18
--- /dev/null
+++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.util
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.celeborn.common.CelebornConf
+
+object CelebornHadoopUtils {
+  private[celeborn] def newConfiguration(conf: CelebornConf): Configuration = {
+    val hadoopConf = new Configuration()
+    appendSparkHadoopConfigs(conf, hadoopConf)
+    hadoopConf
+  }
+
+  private def appendSparkHadoopConfigs(conf: CelebornConf, hadoopConf: Configuration): Unit = {
+    // Copy any "celeborn.hadoop.foo=bar" celeborn properties into conf as "foo=bar"
+    for ((key, value) <- conf.getAll if key.startsWith("celeborn.hadoop.")) {
+      hadoopConf.set(key.substring("celeborn.hadoop.".length), value)
+    }
+  }
+}
diff --git a/common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala b/common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala
new file mode 100644
index 000000000..268f3b75e
--- /dev/null
+++ b/common/src/test/scala/org/apache/celeborn/common/network/CelebornRackResolverSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network
+
+import java.io.File
+
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic.{NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY}
+import org.apache.hadoop.net.{Node, TableMapping}
+import org.apache.hadoop.shaded.com.google.common.base.Charsets
+import org.apache.hadoop.shaded.com.google.common.io.Files
+import org.junit.Assert.assertEquals
+
+import org.apache.celeborn.CelebornFunSuite
+import org.apache.celeborn.common.CelebornConf
+
+class CelebornRackResolverSuite extends CelebornFunSuite {
+
+  test("Test TableMapping") {
+    val hostName1 = "1.2.3.4"
+    val hostName2 = "5.6.7.8"
+    val mapFile: File = File.createTempFile(getClass.getSimpleName + ".testResolve", ".txt")
+    Files.asCharSink(mapFile, Charsets.UTF_8).write(
+      hostName1 + " /rack1\n" + hostName2 + "\t/rack2\n")
+    mapFile.deleteOnExit()
+
+    val conf = new CelebornConf
+    conf.set(
+      "celeborn.hadoop." + NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+      classOf[TableMapping].getName)
+    conf.set("celeborn.hadoop." + NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, mapFile.getCanonicalPath)
+    val resolver = new CelebornRackResolver(conf)
+
+    val names = Seq(hostName1, hostName2)
+
+    val result: Seq[Node] = resolver.resolve(names)
+    assertEquals(names.size, result.size)
+    assertEquals("/rack1", result(0).getNetworkLocation)
+    assertEquals("/rack2", result(1).getNetworkLocation)
+  }
+}