You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by heary-cao <gi...@git.apache.org> on 2018/01/27 06:39:45 UTC

[GitHub] spark pull request #20415: [SPARK-23247][SQL]combines Unsafe operations and ...

GitHub user heary-cao opened a pull request:

    https://github.com/apache/spark/pull/20415

    [SPARK-23247][SQL]combines Unsafe operations and statistics operations in Scan Data Source

    ## What changes were proposed in this pull request?
    
    Currently, we scan the execution plan of the data source, first the unsafe operation of each row of data, and then re traverse the data for the count of rows. In terms of performance, this is not necessary. this PR combines the two operations and makes statistics on the number of rows while performing the unsafe operation.
    
    Before modified,
    
    ```
    val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) =>
    val proj = UnsafeProjection.create(schema)
     proj.initialize(index)
    iter.map(proj)
    }
    
    val numOutputRows = longMetric("numOutputRows")
    unsafeRow.map { r =>
    numOutputRows += 1
     r
    }
    
    
    ```
    After modified,
    
        val numOutputRows = longMetric("numOutputRows")
    
        rdd.mapPartitionsWithIndexInternal { (index, iter) =>
          val proj = UnsafeProjection.create(schema)
          proj.initialize(index)
          iter.map( r => {
            numOutputRows += 1
            proj(r)
          })
        }
    
    ## How was this patch tested?
    
    the existed test cases.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/heary-cao/spark DataSourceScanExec

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20415.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20415
    
----
commit e3e09d98072bd39328a4e7d4de1ddd38594c6232
Author: caoxuewen <ca...@...>
Date:   2018-01-27T06:27:37Z

    combines Unsafe operations and statistics operations in Scan Data Source

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    **[Test build #86860 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86860/testReport)** for PR 20415 at commit [`e3e09d9`](https://github.com/apache/spark/commit/e3e09d98072bd39328a4e7d4de1ddd38594c6232).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86898/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by heary-cao <gi...@git.apache.org>.
Github user heary-cao commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    @cloud-fan Can you help me to review it. thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    **[Test build #86898 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86898/testReport)** for PR 20415 at commit [`5d70fd1`](https://github.com/apache/spark/commit/5d70fd1f939a67707f16c1afdefea6d4342c019e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    **[Test build #86860 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86860/testReport)** for PR 20415 at commit [`e3e09d9`](https://github.com/apache/spark/commit/e3e09d98072bd39328a4e7d4de1ddd38594c6232).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    looks like a reasonable change to me. Although I don't think this will have some significant performance improvement, it makes the code more compact.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    LGTM, only a minor comment


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    @heary-cao have you benchmarked this? The reason I am asking is because Spark SQL chains iterators, these are pipelined and only materialized when we need to. Your PR effectively removes two virtual calls (hasNext/next) per tuple, so I don't see too much benefit here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86860/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    **[Test build #86898 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86898/testReport)** for PR 20415 at commit [`5d70fd1`](https://github.com/apache/spark/commit/5d70fd1f939a67707f16c1afdefea6d4342c019e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by heary-cao <gi...@git.apache.org>.
Github user heary-cao commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    @hvanhovell ,thank you for review it.
    I tested the code for this PR change, 
    
    **in FileSourceScanExec->doExecute code:**
    
    ```
          if (needsUnsafeRowConversion) {
            scan.mapPartitionsWithIndexInternal { (index, iter) =>
              val proj = UnsafeProjection.create(schema)
              proj.initialize(index)
              iter.map( r => {
                numOutputRows += 1
                proj(r)
              })
            }
          } else {
            val scanOther = scan.mapPartitionsWithIndexInternal { (index, iter) =>
              val proj = UnsafeProjection.create(schema)
              proj.initialize(index)
              iter.map(proj)
            }
    
            scanOther.map { r =>
              numOutputRows += 1
              r
            }
          }
    
    ```
    
    **Start spark-shell:**
    
    > ./spark-shell --executor-memory 15G  --total-executor-cores 1 --conf spark.executor.cores=1
    
    
    **test code:**
    
    > val df4 = (0 until 500000).map(i => (i % 2, i % 3, i % 4, i % 5, i % 6, i % 7, i % 8)).toDF("i2","i3","i4","i5","i6","i7","i8")
    > df4.write.format("parquet").partitionBy("i2", "i3", "i4").bucketBy(8, "i5","i6","i7","i8").saveAsTable("table500000")
    > 
    > def runBenchmark(name: String, cardinality: Int)(f: => Unit): Unit = {
    >   val startTime = System.nanoTime
    >   (0 to cardinality).foreach(i => f)
    >   val endTime = System.nanoTime
    >   println(s"Time taken in $name: " + (endTime - startTime).toDouble / 1000000000 + " seconds")
    > }
    > 
    > def benchmark(name: String, card: Int)(f: => Unit){
    >   (0 to card).foreach(i => f)
    > }
    > 
    > After modified File SourceScan Exec: 
    > benchmark("File SourceScan Exec", 2){
    > runBenchmark("After modified File SourceScan Exec ", 200) {
    > spark.conf.set("spark.sql.codegen.maxFields", 2)
    > spark.conf.set("spark.sql.parquet.enableVectorizedReader", true)
    > spark.sql("select * from table500000").count()
    > }
    > }
    > 
    > Before modified File SourceScan Exec:
    > benchmark("File SourceScan Exec", 2){
    > runBenchmark("Before modified File SourceScan Exec ", 200) {
    > spark.conf.set("spark.sql.codegen.maxFields", 2)
    > spark.conf.set("spark.sql.parquet.enableVectorizedReader", false)
    > spark.sql("select * from table500000").count()
    > }
    > }
    > 
    
    **test result:**
    
    > 
    > Test 20 times:
    > *Test times:                first times(s)    second times(s)   Third  times(s)   avg(s)
    > *-----------------------------------------------------------------------------------------
    > *Before modified            10.97             10.83             11.05             10.95
    > *After modified               9.33              9.61               9.32               9.42
    > 
    > 
    > Test 100 times:
    > *Test times:                first times(s)    second times(s)   Third  times(s)   avg(s)
    > *-----------------------------------------------------------------------------------------
    > *Before modified            51.74             52.80             71.88             58.80 
    > *After modified               47.24             46.18             48.92             47.45
    > 
    > 
    > Test 200 times:
    > *Test times:                first times(s)    second times(s)   Third  times(s)   avg(s)
    > *-----------------------------------------------------------------------------------------
    > *Before modified            236.85            325.97            395.69            319.50
    > *After modified              208.90             244.13            261.18            238.07 
    > 
    > 
    
    thanks.
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20415: [SPARK-23247][SQL]combines Unsafe operations and ...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20415#discussion_r165188943
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ---
    @@ -326,22 +325,23 @@ case class FileSourceScanExec(
           // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
           WholeStageCodegenExec(this).execute()
         } else {
    -      val unsafeRows = {
    -        val scan = inputRDD
    -        if (needsUnsafeRowConversion) {
    -          scan.mapPartitionsWithIndexInternal { (index, iter) =>
    -            val proj = UnsafeProjection.create(schema)
    -            proj.initialize(index)
    -            iter.map(proj)
    -          }
    -        } else {
    -          scan
    -        }
    -      }
           val numOutputRows = longMetric("numOutputRows")
    -      unsafeRows.map { r =>
    -        numOutputRows += 1
    -        r
    +
    +      val scan = inputRDD
    --- End diff --
    
    nit: I think this is not needed and we can use `inputRDD`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20415: [SPARK-23247][SQL]combines Unsafe operations and statist...

Posted by heary-cao <gi...@git.apache.org>.
Github user heary-cao commented on the issue:

    https://github.com/apache/spark/pull/20415
  
    @mgaido91 thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20415: [SPARK-23247][SQL]combines Unsafe operations and ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20415


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org