You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2016/03/16 10:19:31 UTC

[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/11759

    [SPARK-13930][SQL] Apply fast serialization on collect limit operator

    ## What changes were proposed in this pull request?
    
    JIRA: https://issues.apache.org/jira/browse/SPARK-13930
    
    Recently the fast serialization has been introduced to collecting DataFrame/Dataset (#11664). The same technology can be used on collect limit operator too.
    
    ## How was this patch tested?
    
    Add a benchmark for collect limit to `BenchmarkWholeStageCodegen`.
    
    Without this patch:
    
        model name      : Westmere E56xx/L56xx/X56xx (Nehalem-C)
        collect limit:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
        -------------------------------------------------------------------------------------------
        collect limit 1 million                  3413 / 3768          0.3        3255.0       1.0X
        collect limit 2 millions                9728 / 10440          0.1        9277.3       0.4X
    
    With this patch:
    
        model name      : Westmere E56xx/L56xx/X56xx (Nehalem-C)
        collect limit:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
        -------------------------------------------------------------------------------------------
        collect limit 1 million                   833 / 1284          1.3         794.4       1.0X
        collect limit 2 millions                 3348 / 4005          0.3        3193.3       0.2X


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 execute-take

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11759.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11759
    
----
commit 16ac5fe6325c2cf7638faab854c09dc50de94f18
Author: Liang-Chi Hsieh <si...@tw.ibm.com>
Date:   2016-03-16T08:41:36Z

    init import.

commit d1306ade2efa407ce0785650d08aef4129aab2c3
Author: Liang-Chi Hsieh <si...@tw.ibm.com>
Date:   2016-03-16T09:06:35Z

    Use fast serialization on collect limit too.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197392401
  
    **[Test build #53320 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53320/consoleFull)** for PR 11759 at commit [`2c2055a`](https://github.com/apache/spark/commit/2c2055a07619b97c431f71fbe7e0a42be2ef5d85).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-198222176
  
    cc @davies The comments are addressed and tests are passed. Please see if this is ok now. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197350986
  
    **[Test build #53320 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53320/consoleFull)** for PR 11759 at commit [`2c2055a`](https://github.com/apache/spark/commit/2c2055a07619b97c431f71fbe7e0a42be2ef5d85).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11759#discussion_r56611911
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
       }
     
       /**
    -   * Runs this query returning the result as an array.
    +   * Packing the UnsafeRows into byte array for faster serialization.
    +   * The byte arrays are in the following format:
    +   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
    +   *
    +   * UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also
    +   * compressed.
        */
    -  def executeCollect(): Array[InternalRow] = {
    -    // Packing the UnsafeRows into byte array for faster serialization.
    -    // The byte arrays are in the following format:
    -    // [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
    -    //
    -    // UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also
    -    // compressed.
    -    val byteArrayRdd = execute().mapPartitionsInternal { iter =>
    +  private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = {
    +    execute().mapPartitionsInternal { iter =>
    +      var count = 0
           val buffer = new Array[Byte](4 << 10)  // 4K
           val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
           val bos = new ByteArrayOutputStream()
           val out = new DataOutputStream(codec.compressedOutputStream(bos))
    -      while (iter.hasNext) {
    +      while (iter.hasNext && (n < 0 || count < n)) {
             val row = iter.next().asInstanceOf[UnsafeRow]
             out.writeInt(row.getSizeInBytes)
             row.writeToStream(out, buffer)
    +        count += 1
           }
           out.writeInt(-1)
           out.flush()
           out.close()
           Iterator(bos.toByteArray)
         }
    +  }
     
    -    // Collect the byte arrays back to driver, then decode them as UnsafeRows.
    +  /**
    +   * Collect the byte arrays back to driver, then decode them as UnsafeRows.
    +   */
    +  private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] = {
    --- End diff --
    
    done with decodeUnsafeRow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-198220270
  
    **[Test build #53502 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53502/consoleFull)** for PR 11759 at commit [`ed9aa30`](https://github.com/apache/spark/commit/ed9aa301fac4af768eee36c5153e38faea9c658b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11759#discussion_r56455468
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
       }
     
       /**
    -   * Runs this query returning the result as an array.
    +   * Packing the UnsafeRows into byte array for faster serialization.
    +   * The byte arrays are in the following format:
    +   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
    --- End diff --
    
    This is the implementation details, it has nothing with the APIs, I'd like to keep these as comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-198219664
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11759#discussion_r56455662
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
       }
     
       /**
    -   * Runs this query returning the result as an array.
    +   * Packing the UnsafeRows into byte array for faster serialization.
    +   * The byte arrays are in the following format:
    +   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
    +   *
    +   * UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also
    +   * compressed.
        */
    -  def executeCollect(): Array[InternalRow] = {
    -    // Packing the UnsafeRows into byte array for faster serialization.
    -    // The byte arrays are in the following format:
    -    // [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
    -    //
    -    // UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also
    -    // compressed.
    -    val byteArrayRdd = execute().mapPartitionsInternal { iter =>
    +  private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = {
    +    execute().mapPartitionsInternal { iter =>
    +      var count = 0
           val buffer = new Array[Byte](4 << 10)  // 4K
           val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
           val bos = new ByteArrayOutputStream()
           val out = new DataOutputStream(codec.compressedOutputStream(bos))
    -      while (iter.hasNext) {
    +      while (iter.hasNext && (n < 0 || count < n)) {
             val row = iter.next().asInstanceOf[UnsafeRow]
             out.writeInt(row.getSizeInBytes)
             row.writeToStream(out, buffer)
    +        count += 1
           }
           out.writeInt(-1)
           out.flush()
           out.close()
           Iterator(bos.toByteArray)
         }
    +  }
     
    -    // Collect the byte arrays back to driver, then decode them as UnsafeRows.
    +  /**
    +   * Collect the byte arrays back to driver, then decode them as UnsafeRows.
    +   */
    +  private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] = {
    --- End diff --
    
    This function has nothong about `collect`, decodeUnsafeRow or deserializeUnsafeRows?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11759#discussion_r56455618
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
       }
     
       /**
    -   * Runs this query returning the result as an array.
    +   * Packing the UnsafeRows into byte array for faster serialization.
    +   * The byte arrays are in the following format:
    +   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
    +   *
    +   * UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also
    +   * compressed.
        */
    -  def executeCollect(): Array[InternalRow] = {
    -    // Packing the UnsafeRows into byte array for faster serialization.
    -    // The byte arrays are in the following format:
    -    // [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
    -    //
    -    // UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also
    -    // compressed.
    -    val byteArrayRdd = execute().mapPartitionsInternal { iter =>
    +  private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = {
    +    execute().mapPartitionsInternal { iter =>
    +      var count = 0
           val buffer = new Array[Byte](4 << 10)  // 4K
           val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
           val bos = new ByteArrayOutputStream()
           val out = new DataOutputStream(codec.compressedOutputStream(bos))
    -      while (iter.hasNext) {
    +      while (iter.hasNext && (n < 0 || count < n)) {
             val row = iter.next().asInstanceOf[UnsafeRow]
             out.writeInt(row.getSizeInBytes)
             row.writeToStream(out, buffer)
    +        count += 1
           }
           out.writeInt(-1)
           out.flush()
           out.close()
           Iterator(bos.toByteArray)
         }
    +  }
     
    -    // Collect the byte arrays back to driver, then decode them as UnsafeRows.
    +  /**
    +   * Collect the byte arrays back to driver, then decode them as UnsafeRows.
    +   */
    +  private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] = {
         val nFields = schema.length
         val results = ArrayBuffer[InternalRow]()
     
    +    val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
    +    val bis = new ByteArrayInputStream(bytes)
    +    val ins = new DataInputStream(codec.compressedInputStream(bis))
    +    var sizeOfNextRow = ins.readInt()
    +    while (sizeOfNextRow >= 0) {
    +      val bs = new Array[Byte](sizeOfNextRow)
    +      ins.readFully(bs)
    +      val row = new UnsafeRow(nFields)
    +      row.pointTo(bs, sizeOfNextRow)
    +      results += row
    +      sizeOfNextRow = ins.readInt()
    +    }
    +    results.toArray
    +  }
    +
    +  /**
    +   * Runs this query returning the result as an array.
    +   */
    +  def executeCollect(): Array[InternalRow] = {
    +    val byteArrayRdd = getByteArrayRdd()
    +
    +    val results = ArrayBuffer[InternalRow]()
         byteArrayRdd.collect().foreach { bytes =>
    -      val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
    -      val bis = new ByteArrayInputStream(bytes)
    -      val ins = new DataInputStream(codec.compressedInputStream(bis))
    -      var sizeOfNextRow = ins.readInt()
    -      while (sizeOfNextRow >= 0) {
    -        val bs = new Array[Byte](sizeOfNextRow)
    -        ins.readFully(bs)
    -        val row = new UnsafeRow(nFields)
    -        row.pointTo(bs, sizeOfNextRow)
    -        results += row
    -        sizeOfNextRow = ins.readInt()
    -      }
    +      results ++= collectRowFromBytes(bytes)
    --- End diff --
    
    Should we avoid this copy ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-198219518
  
    **[Test build #53501 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53501/consoleFull)** for PR 11759 at commit [`6752775`](https://github.com/apache/spark/commit/675277591aa61464bb8d28bb6621e376e51fe6be).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197225986
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53309/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-198225556
  
    LGTM, merging this into master, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197225976
  
    **[Test build #53309 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53309/consoleFull)** for PR 11759 at commit [`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11759#discussion_r56611882
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
       }
     
       /**
    -   * Runs this query returning the result as an array.
    +   * Packing the UnsafeRows into byte array for faster serialization.
    +   * The byte arrays are in the following format:
    +   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
    +   *
    +   * UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also
    +   * compressed.
        */
    -  def executeCollect(): Array[InternalRow] = {
    -    // Packing the UnsafeRows into byte array for faster serialization.
    -    // The byte arrays are in the following format:
    -    // [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
    -    //
    -    // UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also
    -    // compressed.
    -    val byteArrayRdd = execute().mapPartitionsInternal { iter =>
    +  private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = {
    +    execute().mapPartitionsInternal { iter =>
    +      var count = 0
           val buffer = new Array[Byte](4 << 10)  // 4K
           val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
           val bos = new ByteArrayOutputStream()
           val out = new DataOutputStream(codec.compressedOutputStream(bos))
    -      while (iter.hasNext) {
    +      while (iter.hasNext && (n < 0 || count < n)) {
             val row = iter.next().asInstanceOf[UnsafeRow]
             out.writeInt(row.getSizeInBytes)
             row.writeToStream(out, buffer)
    +        count += 1
           }
           out.writeInt(-1)
           out.flush()
           out.close()
           Iterator(bos.toByteArray)
         }
    +  }
     
    -    // Collect the byte arrays back to driver, then decode them as UnsafeRows.
    +  /**
    +   * Collect the byte arrays back to driver, then decode them as UnsafeRows.
    +   */
    +  private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] = {
         val nFields = schema.length
         val results = ArrayBuffer[InternalRow]()
     
    +    val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
    +    val bis = new ByteArrayInputStream(bytes)
    +    val ins = new DataInputStream(codec.compressedInputStream(bis))
    +    var sizeOfNextRow = ins.readInt()
    +    while (sizeOfNextRow >= 0) {
    +      val bs = new Array[Byte](sizeOfNextRow)
    +      ins.readFully(bs)
    +      val row = new UnsafeRow(nFields)
    +      row.pointTo(bs, sizeOfNextRow)
    +      results += row
    +      sizeOfNextRow = ins.readInt()
    +    }
    +    results.toArray
    +  }
    +
    +  /**
    +   * Runs this query returning the result as an array.
    +   */
    +  def executeCollect(): Array[InternalRow] = {
    +    val byteArrayRdd = getByteArrayRdd()
    +
    +    val results = ArrayBuffer[InternalRow]()
         byteArrayRdd.collect().foreach { bytes =>
    -      val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
    -      val bis = new ByteArrayInputStream(bytes)
    -      val ins = new DataInputStream(codec.compressedInputStream(bis))
    -      var sizeOfNextRow = ins.readInt()
    -      while (sizeOfNextRow >= 0) {
    -        val bs = new Array[Byte](sizeOfNextRow)
    -        ins.readFully(bs)
    -        val row = new UnsafeRow(nFields)
    -        row.pointTo(bs, sizeOfNextRow)
    -        results += row
    -        sizeOfNextRow = ins.readInt()
    -      }
    +      results ++= collectRowFromBytes(bytes)
    --- End diff --
    
    I think we should. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197225982
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197273768
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197665452
  
    cc @davies @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-198220376
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53502/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/11759


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197392779
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53320/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-198201967
  
    **[Test build #53501 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53501/consoleFull)** for PR 11759 at commit [`6752775`](https://github.com/apache/spark/commit/675277591aa61464bb8d28bb6621e376e51fe6be).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197227498
  
    retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-198220375
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197228487
  
    **[Test build #53310 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53310/consoleFull)** for PR 11759 at commit [`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11759#discussion_r56455490
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
       }
     
       /**
    -   * Runs this query returning the result as an array.
    +   * Packing the UnsafeRows into byte array for faster serialization.
    +   * The byte arrays are in the following format:
    +   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
    --- End diff --
    
    nvm, you make it as an function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197273772
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53310/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-198219667
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53501/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197273468
  
    **[Test build #53310 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53310/consoleFull)** for PR 11759 at commit [`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197224525
  
    **[Test build #53309 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53309/consoleFull)** for PR 11759 at commit [`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-198203730
  
    **[Test build #53502 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53502/consoleFull)** for PR 11759 at commit [`ed9aa30`](https://github.com/apache/spark/commit/ed9aa301fac4af768eee36c5153e38faea9c658b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11759#issuecomment-197392772
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org