You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/10/05 03:58:50 UTC
spark git commit: [SPARK-22203][SQL] Add job description for file
listing Spark jobs
Repository: spark
Updated Branches:
refs/heads/master 969ffd631 -> c8affec21
[SPARK-22203][SQL] Add job description for file listing Spark jobs
## What changes were proposed in this pull request?
The user may be confused about some 10000-tasks jobs. We can add a job description for these jobs so that the user can figure it out.
## How was this patch tested?
The new unit test.
Before:
<img width="343" alt="screen shot 2017-10-04 at 3 22 09 pm" src="https://user-images.githubusercontent.com/1000778/31202567-f78d15c0-a917-11e7-841e-11b8bf8f0032.png">
After:
<img width="473" alt="screen shot 2017-10-04 at 3 13 51 pm" src="https://user-images.githubusercontent.com/1000778/31202576-fc01e356-a917-11e7-9c2b-7bf80b153adb.png">
Author: Shixiong Zhu <zs...@gmail.com>
Closes #19432 from zsxwing/SPARK-22203.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8affec2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8affec2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8affec2
Branch: refs/heads/master
Commit: c8affec21c91d638009524955515fc143ad86f20
Parents: 969ffd6
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Wed Oct 4 20:58:48 2017 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Wed Oct 4 20:58:48 2017 -0700
----------------------------------------------------------------------
.../datasources/InMemoryFileIndex.scala | 85 ++++++++++++--------
.../sql/test/DataFrameReaderWriterSuite.scala | 31 +++++++
2 files changed, 81 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c8affec2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 203d449..318ada0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
@@ -187,42 +188,56 @@ object InMemoryFileIndex extends Logging {
// in case of large #defaultParallelism.
val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)
- val statusMap = sparkContext
- .parallelize(serializedPaths, numParallelism)
- .mapPartitions { pathStrings =>
- val hadoopConf = serializableConfiguration.value
- pathStrings.map(new Path(_)).toSeq.map { path =>
- (path, listLeafFiles(path, hadoopConf, filter, None))
- }.iterator
- }.map { case (path, statuses) =>
- val serializableStatuses = statuses.map { status =>
- // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
- val blockLocations = status match {
- case f: LocatedFileStatus =>
- f.getBlockLocations.map { loc =>
- SerializableBlockLocation(
- loc.getNames,
- loc.getHosts,
- loc.getOffset,
- loc.getLength)
- }
-
- case _ =>
- Array.empty[SerializableBlockLocation]
- }
-
- SerializableFileStatus(
- status.getPath.toString,
- status.getLen,
- status.isDirectory,
- status.getReplication,
- status.getBlockSize,
- status.getModificationTime,
- status.getAccessTime,
- blockLocations)
+ val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
+ val statusMap = try {
+ val description = paths.size match {
+ case 0 =>
+ s"Listing leaf files and directories 0 paths"
+ case 1 =>
+ s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
+ case s =>
+ s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."
}
- (path.toString, serializableStatuses)
- }.collect()
+ sparkContext.setJobDescription(description)
+ sparkContext
+ .parallelize(serializedPaths, numParallelism)
+ .mapPartitions { pathStrings =>
+ val hadoopConf = serializableConfiguration.value
+ pathStrings.map(new Path(_)).toSeq.map { path =>
+ (path, listLeafFiles(path, hadoopConf, filter, None))
+ }.iterator
+ }.map { case (path, statuses) =>
+ val serializableStatuses = statuses.map { status =>
+ // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
+ val blockLocations = status match {
+ case f: LocatedFileStatus =>
+ f.getBlockLocations.map { loc =>
+ SerializableBlockLocation(
+ loc.getNames,
+ loc.getHosts,
+ loc.getOffset,
+ loc.getLength)
+ }
+
+ case _ =>
+ Array.empty[SerializableBlockLocation]
+ }
+
+ SerializableFileStatus(
+ status.getPath.toString,
+ status.getLen,
+ status.isDirectory,
+ status.getReplication,
+ status.getBlockSize,
+ status.getModificationTime,
+ status.getAccessTime,
+ blockLocations)
+ }
+ (path.toString, serializableStatuses)
+ }.collect()
+ } finally {
+ sparkContext.setJobDescription(previousJobDescription)
+ }
// turn SerializableFileStatus back to Status
statusMap.map { case (path, serializableStatuses) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/c8affec2/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 569bac1..a5d7e62 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -21,10 +21,14 @@ import java.io.File
import java.util.Locale
import java.util.concurrent.ConcurrentLinkedQueue
+import scala.collection.JavaConverters._
+
import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkContext
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.SQLConf
@@ -775,4 +779,31 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}
}
+
+ test("use Spark jobs to list files") {
+ withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "1") {
+ withTempDir { dir =>
+ val jobDescriptions = new ConcurrentLinkedQueue[String]()
+ val jobListener = new SparkListener {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ jobDescriptions.add(jobStart.properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
+ }
+ }
+ sparkContext.addSparkListener(jobListener)
+ try {
+ spark.range(0, 3).map(i => (i, i))
+ .write.partitionBy("_1").mode("overwrite").parquet(dir.getCanonicalPath)
+ // normal file paths
+ checkDatasetUnorderly(
+ spark.read.parquet(dir.getCanonicalPath).as[(Long, Long)],
+ 0L -> 0L, 1L -> 1L, 2L -> 2L)
+ sparkContext.listenerBus.waitUntilEmpty(10000)
+ assert(jobDescriptions.asScala.toList.exists(
+ _.contains("Listing leaf files and directories for 3 paths")))
+ } finally {
+ sparkContext.removeSparkListener(jobListener)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org