You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/22 18:39:22 UTC

[GitHub] [spark] wangyum opened a new pull request #24679: [SPARK-27807][SQL] Parallel resolve leaf statuses InMemoryFileIndex

wangyum opened a new pull request #24679: [SPARK-27807][SQL] Parallel resolve leaf statuses InMemoryFileIndex
URL: https://github.com/apache/spark/pull/24679
 
 
   ## What changes were proposed in this pull request?
   
   This pr refers to [`ParquetFileFormat.readParquetFootersInParallel`](https://github.com/apache/spark/blob/215609def22da14c464b37374ceae4f53a39a145/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L539-L555) to parallel resolve leaf statuses in `InMemoryFileIndex`.
   
   
   ## How was this patch tested?
   
   manual tests. Change [`InMemoryFileIndex.listLeafFiles`](https://github.com/apache/spark/blob/46f9f44918ba2589c6ffc938822cbdfac1c6f4d1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L266-L345) to:
   ```scala
      private def listLeafFiles(
         path: Path,
         hadoopConf: Configuration,
         filter: PathFilter,
         sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
       logTrace(s"Listing $path")
       val fs = path.getFileSystem(hadoopConf)
   
       // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
       // Note that statuses only include FileStatus for the files and dirs directly under path,
       // and does not include anything else recursively.
       val statuses = try fs.listStatus(path) catch {
         case _: FileNotFoundException =>
           logWarning(s"The directory $path was not found. Was it deleted very recently?")
           Array.empty[FileStatus]
       }
   
       val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
   
       val allLeafStatuses = {
         val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
         val nestedFiles: Seq[FileStatus] = sessionOpt match {
           case Some(session) =>
             bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
           case _ =>
             dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
         }
         val allFiles = topLevelFiles ++ nestedFiles
         if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
       }
   
       val missingFiles = mutable.ArrayBuffer.empty[String]
       val filteredLeafStatuses = allLeafStatuses.filterNot(
         status => shouldFilterOut(status.getPath.getName))
   
       def resolveLeafStatuses(fileStatus: FileStatus): Option[LocatedFileStatus] = fileStatus match {
         case f: LocatedFileStatus =>
           Some(f)
   
         // NOTE:
         //
         // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
         //   operations, calling `getFileBlockLocations` does no harm here since these file system
         //   implementations don't actually issue RPC for this method.
         //
         // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
         //   be a big deal since we always use to `bulkListLeafFiles` when the number of
         //   paths exceeds threshold.
         case f =>
           // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
           // which is very slow on some file system (RawLocalFileSystem, which is launch a
           // subprocess and parse the stdout).
           try {
             val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
               // Store BlockLocation objects to consume less memory
               if (loc.getClass == classOf[BlockLocation]) {
                 loc
               } else {
                 new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)
               }
             }
             val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
               f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
             if (f.isSymlink) {
               lfs.setSymlink(f.getSymlink)
             }
             Some(lfs)
           } catch {
             case _: FileNotFoundException =>
               missingFiles += f.getPath.toString
               None
           }
       }
   
       val start1 = System.currentTimeMillis()
       val parResolvedLeafStatuses = ThreadUtils.parmap(
         filteredLeafStatuses.toSeq, "resolveLeafStatuses", 8)(resolveLeafStatuses).flatten
   
       val start2 = System.currentTimeMillis()
       val defResolvedLeafStatuses = filteredLeafStatuses.flatMap(resolveLeafStatuses)
   
       val end = System.currentTimeMillis()
       logWarning(s"Elements: ${parResolvedLeafStatuses.size}, " +
         s"parallel time token: ${start2 - start1}, default time token: ${end - start2}.")
   
       if (missingFiles.nonEmpty) {
         logWarning(
           s"the following files were missing during file scan:\n  ${missingFiles.mkString("\n  ")}")
       }
   
       parResolvedLeafStatuses
     }
   ```
   
   The first time:
   ```
   19/05/22 08:09:55 WARN InMemoryFileIndex: Elements: 10, parallel time token: 66, default time token: 123.
   19/05/22 08:10:08 WARN InMemoryFileIndex: Elements: 20, parallel time token: 27, default time token: 202.
   19/05/22 08:10:10 WARN InMemoryFileIndex: Elements: 50, parallel time token: 40, default time token: 371.
   19/05/22 08:10:13 WARN InMemoryFileIndex: Elements: 100, parallel time token: 83, default time token: 664.
   19/05/22 08:10:18 WARN InMemoryFileIndex: Elements: 200, parallel time token: 162, default time token: 1174.
   19/05/22 08:10:34 WARN InMemoryFileIndex: Elements: 500, parallel time token: 3797, default time token: 7533.
   19/05/22 08:10:43 WARN InMemoryFileIndex: Elements: 1000, parallel time token: 88, default time token: 357.
   19/05/22 08:11:24 WARN InMemoryFileIndex: Elements: 2000, parallel time token: 3755, default time token: 14304.
   19/05/22 08:13:03 WARN InMemoryFileIndex: Elements: 5000, parallel time token: 47553, default time token: 30628.
   19/05/22 08:20:31 WARN InMemoryFileIndex: Elements: 10000, parallel time token: 20475, default time token: 23823.
   19/05/22 08:22:01 WARN InMemoryFileIndex: Elements: 20000, parallel time token: 1561, default time token: 9339.
   19/05/22 08:23:02 WARN InMemoryFileIndex: Elements: 40000, parallel time token: 2829, default time token: 20497.
   19/05/22 08:24:44 WARN InMemoryFileIndex: Elements: 80000, parallel time token: 5941, default time token: 51388.
   19/05/22 08:33:55 WARN InMemoryFileIndex: Elements: 692375, parallel time token: 51993, default time token: 406825.
   ```
   
   The second time:
   ```
   19/05/22 08:47:17 WARN InMemoryFileIndex: Elements: 10, parallel time token: 34, default time token: 10.
   19/05/22 08:49:59 WARN InMemoryFileIndex: Elements: 20, parallel time token: 8, default time token: 22.
   19/05/22 08:50:03 WARN InMemoryFileIndex: Elements: 50, parallel time token: 14, default time token: 28.
   19/05/22 08:50:23 WARN InMemoryFileIndex: Elements: 100, parallel time token: 20, default time token: 73.
   19/05/22 08:50:32 WARN InMemoryFileIndex: Elements: 200, parallel time token: 43, default time token: 103.
   19/05/22 08:50:47 WARN InMemoryFileIndex: Elements: 500, parallel time token: 166, default time token: 271.
   19/05/22 08:52:08 WARN InMemoryFileIndex: Elements: 1000, parallel time token: 113, default time token: 385.
   19/05/22 08:53:44 WARN InMemoryFileIndex: Elements: 2000, parallel time token: 257, default time token: 971.
   19/05/22 08:55:01 WARN InMemoryFileIndex: Elements: 5000, parallel time token: 753, default time token: 2197.
   19/05/22 08:57:04 WARN InMemoryFileIndex: Elements: 10000, parallel time token: 1116, default time token: 4307.
   19/05/22 09:00:39 WARN InMemoryFileIndex: Elements: 20000, parallel time token: 3160, default time token: 8983.
   19/05/22 09:14:24 WARN InMemoryFileIndex: Elements: 40000, parallel time token: 77537, default time token: 397752.
   19/05/22 09:19:05 WARN InMemoryFileIndex: Elements: 80000, parallel time token: 10355, default time token: 91161.
   19/05/22 09:30:05 WARN InMemoryFileIndex: Elements: 692375, parallel time token: 71769, default time token: 375620.
   ```
   
   The third time:
   ```
   19/05/22 09:13:21 WARN InMemoryFileIndex: Elements: 10, parallel time token: 62, default time token: 83.
   19/05/22 09:13:35 WARN InMemoryFileIndex: Elements: 20, parallel time token: 53, default time token: 299.
   19/05/22 09:13:39 WARN InMemoryFileIndex: Elements: 50, parallel time token: 71, default time token: 347.
   19/05/22 09:14:31 WARN InMemoryFileIndex: Elements: 100, parallel time token: 27, default time token: 2766.
   19/05/22 09:14:41 WARN InMemoryFileIndex: Elements: 200, parallel time token: 61, default time token: 235.
   19/05/22 09:15:22 WARN InMemoryFileIndex: Elements: 500, parallel time token: 65, default time token: 239.
   19/05/22 09:17:20 WARN InMemoryFileIndex: Elements: 1000, parallel time token: 882, default time token: 428.
   19/05/22 09:18:19 WARN InMemoryFileIndex: Elements: 2000, parallel time token: 919, default time token: 6229.
   19/05/22 09:20:11 WARN InMemoryFileIndex: Elements: 5000, parallel time token: 557, default time token: 9557.
   19/05/22 09:22:27 WARN InMemoryFileIndex: Elements: 10000, parallel time token: 838, default time token: 10047.
   19/05/22 09:24:49 WARN InMemoryFileIndex: Elements: 20000, parallel time token: 2294, default time token: 9914.
   19/05/22 09:28:18 WARN InMemoryFileIndex: Elements: 40000, parallel time token: 2955, default time token: 22586.
   19/05/22 09:30:30 WARN InMemoryFileIndex: Elements: 80000, parallel time token: 7447, default time token: 50589.
   19/05/22 09:41:03 WARN InMemoryFileIndex: Elements: 692375, parallel time token: 71536, default time token: 406319.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org