You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2022/12/16 18:58:00 UTC
[jira] [Updated] (SPARK-38062) FallbackStorage shouldn't attempt to resolve arbitrary "remote" hostname
[ https://issues.apache.org/jira/browse/SPARK-38062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-38062:
----------------------------------
Parent: SPARK-41550
Issue Type: Sub-task (was: Improvement)
> FallbackStorage shouldn't attempt to resolve arbitrary "remote" hostname
> ------------------------------------------------------------------------
>
> Key: SPARK-38062
> URL: https://issues.apache.org/jira/browse/SPARK-38062
> Project: Spark
> Issue Type: Sub-task
> Components: Spark Core
> Affects Versions: 3.3.0
> Reporter: Erik Krogen
> Assignee: Erik Krogen
> Priority: Major
> Fix For: 3.3.0
>
>
> {{FallbackStorage}} uses a placeholder block manager ID:
> {code:scala}
> private[spark] object FallbackStorage extends Logging {
> /** We use one block manager id as a place holder. */
> val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
> {code}
> That second argument is normally interpreted as a hostname, but is passed as the string "remote" in this case.
> {{BlockManager}} will consider this placeholder as one of the peers in some cases:
> {code:language=scala|title=BlockManager.scala}
> private[storage] def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
> peerFetchLock.synchronized {
> ...
> if (cachedPeers.isEmpty &&
> conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
> Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)
> } else {
> cachedPeers
> }
> }
> }
> {code}
> {{BlockManagerDecommissioner.ShuffleMigrationRunnable}} will then attempt to perform an upload to this placeholder ID:
> {code:scala}
> try {
> blocks.foreach { case (blockId, buffer) =>
> logDebug(s"Migrating sub-block ${blockId}")
> bm.blockTransferService.uploadBlockSync(
> peer.host,
> peer.port,
> peer.executorId,
> blockId,
> buffer,
> StorageLevel.DISK_ONLY,
> null) // class tag, we don't need for shuffle
> logDebug(s"Migrated sub-block $blockId")
> }
> logInfo(s"Migrated $shuffleBlockInfo to $peer")
> } catch {
> case e: IOException =>
> ...
> if (bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo).size < blocks.size) {
> logWarning(s"Skipping block $shuffleBlockInfo, block deleted.")
> } else if (fallbackStorage.isDefined) {
> fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
> } else {
> logError(s"Error occurred during migrating $shuffleBlockInfo", e)
> keepRunning = false
> }
> {code}
> Since "remote" is not expected to be a resolvable hostname, an {{IOException}} occurs, and {{fallbackStorage}} is used. But, we shouldn't try to resolve this. First off, it's completely unnecessary and strange to be treating the placeholder ID as a resolvable hostname, relying on an exception to realize that we need to use the {{fallbackStorage}}.
> To make matters worse, in some network environments, "remote" may be a resolvable hostname, completely breaking this functionality. In the particular environment that I use for running automated tests, there is a DNS entry for "remote" which, when you attempt to connect to it, will hang for a long period of time. This essentially hangs the executor decommission process, and in the case of unit tests, breaks {{FallbackStorageSuite}} as it exceeds its timeouts. I'm not sure, but it's possible this is related to SPARK-35584 as well (if sometimes in the GA environment, it takes a long time for the OS to decide that "remote" is not a valid hostname).
> We shouldn't attempt to treat this placeholder ID as a real hostname.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org