You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by habren <gi...@git.apache.org> on 2018/08/07 03:25:48 UTC

[GitHub] spark pull request #22018: [SPARK-25038][SQL] Accelerate Spark Plan generati...

GitHub user habren opened a pull request:

    https://github.com/apache/spark/pull/22018

    [SPARK-25038][SQL] Accelerate Spark Plan generation when Spark SQL re…

    https://issues.apache.org/jira/browse/SPARK-25038
    
    When Spark SQL read large amount of data, it take a long time (more than 10 minutes) to generate physical Plan and then ActiveJob
    
     
    
    Example:
    
    There is a table which is partitioned by date and hour. There are more than 13 TB data each hour and 185 TB per day. When we just issue a very simple SQL, it take a long time to generate ActiveJob
    
     
    
    The SQL statement is
    
    select count(device_id) from test_tbl where date=20180731 and hour='21';
     
    
    Before optimization, it takes 2 minutes and 9 seconds to generate the Job
    
     
    
    The SQL is issued at 2018-08-07 09:07:41
    
    
    
    However, the job is submitted at 2018-08-07 09:09:53, which is 2minutes and 9 seconds later than the SQL issue time
    
    
    
     
    
    After the optimization, it takes only 4 seconds to generate the Job
    
    The SQL is issued at 2018-08-07 09:20:15
    
    
    
     
    
    And the job is submitted at 2018-08-07 09:20:19, which is 4 seconds later than the SQL issue time
    
    
    
     
    
     

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/habren/spark SPARK-25038

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22018.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22018
    
----
commit 2bb5924e04eba5accfe58a4fbae094d46cc36488
Author: Jason Guo <ja...@...>
Date:   2018-08-07T03:13:03Z

    [SPARK-25038][SQL] Accelerate Spark Plan generation when Spark SQL read large amount of data

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22018: [SPARK-25038][SQL] Accelerate Spark Plan generati...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22018#discussion_r208779110
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---
    @@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging {
         val missingFiles = mutable.ArrayBuffer.empty[String]
         val filteredLeafStatuses = allLeafStatuses.filterNot(
           status => shouldFilterOut(status.getPath.getName))
    -    val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
    +    val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap {
    --- End diff --
    
    As this can be called on executors, I think we should use `ThreadUtils.parmap`. cc @MaxGekk 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22018: [SPARK-25038][SQL] Get block location in parallel

Posted by habren <gi...@git.apache.org>.
Github user habren commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22018#discussion_r208787523
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---
    @@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging {
         val missingFiles = mutable.ArrayBuffer.empty[String]
         val filteredLeafStatuses = allLeafStatuses.filterNot(
           status => shouldFilterOut(status.getPath.getName))
    -    val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
    +    val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap {
           case f: LocatedFileStatus =>
    --- End diff --
    
    Thanks. The comment was updated


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22018: [SPARK-25038][SQL] Accelerate Spark Plan generati...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22018#discussion_r208783652
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---
    @@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging {
         val missingFiles = mutable.ArrayBuffer.empty[String]
         val filteredLeafStatuses = allLeafStatuses.filterNot(
           status => shouldFilterOut(status.getPath.getName))
    -    val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
    +    val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap {
    --- End diff --
    
    btw, is this a right approach? I a little confuse this with the current parallel partition discovery path..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22018: [SPARK-25038][SQL] Accelerate Spark Plan generation when...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22018
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22018: [SPARK-25038][SQL] Accelerate Spark Plan generation when...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22018
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22018: [SPARK-25038][SQL] Get block location in parallel

Posted by habren <gi...@git.apache.org>.
Github user habren commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22018#discussion_r208788059
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---
    @@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging {
         val missingFiles = mutable.ArrayBuffer.empty[String]
         val filteredLeafStatuses = allLeafStatuses.filterNot(
           status => shouldFilterOut(status.getPath.getName))
    -    val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
    +    val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap {
    --- End diff --
    
    Thanks @maropu for your comments. I updated the title and description. Let's explain the difference between this change and the current parallel partition discovery. The current one will discovery different partitions in parallel. This change will get the block location for a single partition in parallel. When there is only a few partitions and each contains tons of thousands of files, the current partition discovery won't help. And this change can accelerate it in this case


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22018: [SPARK-25038][SQL] Accelerate Spark Plan generation when...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22018
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22018: [SPARK-25038][SQL] Accelerate Spark Plan generation when...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/22018
  
    Can you narrow down the title and description? I thinks the current one is obscure..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22018: [SPARK-25038][SQL] Accelerate Spark Plan generati...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22018#discussion_r208784598
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---
    @@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging {
         val missingFiles = mutable.ArrayBuffer.empty[String]
         val filteredLeafStatuses = allLeafStatuses.filterNot(
           status => shouldFilterOut(status.getPath.getName))
    -    val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
    +    val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap {
           case f: LocatedFileStatus =>
    --- End diff --
    
    There is a comment about `listLeafFilesInParallel` at L311 is not correct as `listLeafFilesInParallel` is removed. Can you also update this comment too?
    
    https://github.com/apache/spark/blob/2bb5924e04eba5accfe58a4fbae094d46cc36488/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L311


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22018: [SPARK-25038][SQL] Get block location in parallel

Posted by MaxGekk <gi...@git.apache.org>.
Github user MaxGekk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22018#discussion_r208824418
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---
    @@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging {
         val missingFiles = mutable.ArrayBuffer.empty[String]
         val filteredLeafStatuses = allLeafStatuses.filterNot(
           status => shouldFilterOut(status.getPath.getName))
    -    val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
    +    val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap {
    --- End diff --
    
    Parallel Scala collections are not interruptible in some cases as a consequence of that if you use them on executors, tasks cannot be canceled properly. You can do an experiment yourself and run the code in a lambda function: https://github.com/apache/spark/blob/131ca146ed390cd0109cd6e8c95b61e418507080/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala#L143-L150
    
    When you cancel the job, threads will be still blocking on the sleep call. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22018: [SPARK-25038][SQL] Accelerate Spark Plan generati...

Posted by habren <gi...@git.apache.org>.
Github user habren commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22018#discussion_r208784609
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---
    @@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging {
         val missingFiles = mutable.ArrayBuffer.empty[String]
         val filteredLeafStatuses = allLeafStatuses.filterNot(
           status => shouldFilterOut(status.getPath.getName))
    -    val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
    +    val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap {
    --- End diff --
    
    Thanks @viirya for feedback. Yes, this method can be called on executors as below. Do you think it's not thread-safe ?
    Each partitions will have its own hadoopConf and then own fs, and nothing is shared in this method.
    
    sparkContext
            .parallelize(serializedPaths, numParallelism)
            .mapPartitions { pathStrings =>
              val hadoopConf = serializableConfiguration.value
              pathStrings.map(new Path(_)).toSeq.map { path =>
                (path, listLeafFiles(path, hadoopConf, filter, None))
              }.iterator
            }.map { case (path, statuses) =>
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22018: [SPARK-25038][SQL] Accelerate Spark Plan generation when...

Posted by habren <gi...@git.apache.org>.
Github user habren commented on the issue:

    https://github.com/apache/spark/pull/22018
  
    Hi Takeshi Yamamuro Hyukjin Kwon​ and @viirya Can you take a look at this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org