You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2016/01/18 19:09:59 UTC

[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/10809

    [SPARK-12879][SQL] improve the unsafe row writing framework

    As we begin to use unsafe row writing framework(`BufferHolder` and `UnsafeRowWriter`) in more and more places(`UnsafeProjection`, `UnsafeRowParquetRecordReader`, `GenerateColumnAccessor`, etc.), we should add more doc to it and make it easier to use.
    
    This PR abstract the technique used in `UnsafeRowParquetRecordReader`: avoid unnecessary operatition as more as possible. For example, do not always point the row to the buffer at the end, we only need to update the size of row. If all fields are of primitive type, we can even save the row size updating. Then we can apply this technique to more places easily.
    
    a local benchmark shows `UnsafeProjection` is up to 1.7x faster after this PR:
    **old version**
    ```
    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    unsafe projection:                 Avg Time(ms)    Avg Rate(M/s)  Relative Rate
    -------------------------------------------------------------------------------
    single long                             2616.04           102.61         1.00 X
    single nullable long                    3032.54            88.52         0.86 X
    primitive types                         9121.05            29.43         0.29 X
    nullable primitive types               12410.60            21.63         0.21 X
    ```
    
    **new version**
    ```
    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
    unsafe projection:                 Avg Time(ms)    Avg Rate(M/s)  Relative Rate
    -------------------------------------------------------------------------------
    single long                             1533.34           175.07         1.00 X
    single nullable long                    2306.73           116.37         0.66 X
    primitive types                         8403.93            31.94         0.18 X
    nullable primitive types               12448.39            21.56         0.12 X
    ```
    
    For single non-nullable int(the best case), we can have about 1.7x speed up. Even it's nullable, we can still have 1.3x speed up. For other cases, it's not such a boost as the saved operations only take a little proportion of the all process.  The benchmark code is included in this PR.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark unsafe-projection

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10809.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10809
    
----
commit 397871117179c31c5a634c96165e8cf934316291
Author: Wenchen Fan <we...@databricks.com>
Date:   2016-01-17T23:16:06Z

    improve the unsafe row writing framework

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172618609
  
    **[Test build #49606 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49606/consoleFull)** for PR 10809 at commit [`9a63852`](https://github.com/apache/spark/commit/9a63852d646ba67133fd22db4dc087aba6189313).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50157593
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala ---
    @@ -73,9 +70,27 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
           row: String,
           inputs: Seq[ExprCode],
           inputTypes: Seq[DataType],
    -      bufferHolder: String): String = {
    +      bufferHolder: String,
    +      isTopLevel: Boolean = false): String = {
    +    val rowWriterClass = classOf[UnsafeRowWriter].getName
         val rowWriter = ctx.freshName("rowWriter")
    -    ctx.addMutableState(rowWriterClass, rowWriter, s"this.$rowWriter = new $rowWriterClass();")
    +    ctx.addMutableState(rowWriterClass, rowWriter,
    +      s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
    +
    +    val resetWriter = if (isTopLevel) {
    +      // For top level row writer, it always writes to the beginning of the global buffer holder,
    +      // which means its fixed-size region always in the same position, so we don't need to call
    +      // `reset` to set up its fixed-size region every time.
    +      if (inputs.map(_.isNull).forall(_ == "false")) {
    +        // If all fields are not nullable, which means the null bits never changes, then we don't
    +        // need to clear it out every time.
    +        ""
    +      } else {
    +        s"$rowWriter.zeroOutNullBytes();"
    --- End diff --
    
    Make sense for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50029727
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala ---
    @@ -73,9 +70,27 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
           row: String,
           inputs: Seq[ExprCode],
           inputTypes: Seq[DataType],
    -      bufferHolder: String): String = {
    +      bufferHolder: String,
    +      isTopLevel: Boolean = false): String = {
    +    val rowWriterClass = classOf[UnsafeRowWriter].getName
         val rowWriter = ctx.freshName("rowWriter")
    -    ctx.addMutableState(rowWriterClass, rowWriter, s"this.$rowWriter = new $rowWriterClass();")
    +    ctx.addMutableState(rowWriterClass, rowWriter,
    +      s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
    +
    +    val resetWriter = if (isTopLevel) {
    +      // For top level row writer, it always writes to the beginning of the global buffer holder,
    +      // which means its fixed-size region always in the same position, so we don't need to call
    +      // `reset` to set up its fixed-size region every time.
    +      if (inputs.map(_.isNull).forall(_ == "false")) {
    +        // If all fields are not nullable, which means the null bits never changes, then we don't
    +        // need to clear it out every time.
    +        ""
    +      } else {
    +        s"$rowWriter.zeroOutNullBites();"
    --- End diff --
    
    Here I made a different decision compare to the unsafe parquet reader.  We can clear out the null bits at beginning, and call `UnsafeRowWriter.write` instead of `UnsafeRow.setXXX`, which saves one null bits updating.  If null values are rare, this one should be faster.  I'll benchmark it later.
    cc @nongli 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-174739459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50026/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50173875
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala ---
    @@ -288,22 +299,43 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
         val exprEvals = ctx.generateExpressions(expressions, useSubexprElimination)
         val exprTypes = expressions.map(_.dataType)
     
    +    val numVarLenFields = exprTypes.count {
    --- End diff --
    
    This is used to avoid calling reset and setTotalSize(), still useful. nvm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50036261
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java ---
    @@ -26,36 +26,44 @@
     import org.apache.spark.unsafe.types.UTF8String;
     
     /**
    - * A helper class to write data into global row buffer using `UnsafeRow` format,
    - * used by {@link org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
    + * A helper class to write data into global row buffer using `UnsafeRow` format.
    + *
    + * It will remember the offset of row buffer which it starts to write, and move the cursor of row
    + * buffer while writing.  If a new record comes, the cursor of row buffer will be reset, so we need
    + * to also call `reset` of this class before writing, to update the `startingOffset` and clear out
    + * null bits.  Note that if we use it to write data into the result unsafe row, which means we will
    + * always write from the very beginning of the global row buffer, we don't need to update
    + * `startingOffset` and can just call `zeroOutNullBites` before writing new record.
      */
     public class UnsafeRowWriter {
     
    -  private BufferHolder holder;
    +  private final BufferHolder holder;
       // The offset of the global buffer where we start to write this row.
       private int startingOffset;
    -  private int nullBitsSize;
    -  private UnsafeRow row;
    +  private final int nullBitsSize;
    +  private final int fixedSize;
     
    -  public void initialize(BufferHolder holder, int numFields) {
    -    this.holder = holder;
    +  public void reset() {
         this.startingOffset = holder.cursor;
    -    this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
     
         // grow the global buffer to make sure it has enough space to write fixed-length data.
    -    final int fixedSize = nullBitsSize + 8 * numFields;
    -    holder.grow(fixedSize, row);
    +    holder.grow(fixedSize);
         holder.cursor += fixedSize;
     
    -    // zero-out the null bits region
    +    zeroOutNullBites();
    +  }
    +
    +  public void zeroOutNullBites() {
         for (int i = 0; i < nullBitsSize; i += 8) {
           Platform.putLong(holder.buffer, startingOffset + i, 0L);
         }
       }
     
    -  public void initialize(UnsafeRow row, BufferHolder holder, int numFields) {
    -    initialize(holder, numFields);
    -    this.row = row;
    +  public UnsafeRowWriter(BufferHolder holder, int numFields) {
    --- End diff --
    
    no, it's different. The holder is global and it only contains the result row.  But we may use `UnsafeRowWriter` to write inner struct which is not the result row.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/10809


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50030016
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala ---
    @@ -0,0 +1,107 @@
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.RowEncoder
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.types._
    +import org.apache.spark.util.Benchmark
    +
    +object UnsafeProjectionBenchmark {
    --- End diff --
    
    yea, I think it is fine (as long as we provide enough doc on how to run it). We also have `ParquetReadBenchmark` in the repo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172644412
  
    **[Test build #49606 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49606/consoleFull)** for PR 10809 at commit [`9a63852`](https://github.com/apache/spark/commit/9a63852d646ba67133fd22db4dc087aba6189313).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50770676
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala ---
    @@ -73,9 +70,27 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
           row: String,
           inputs: Seq[ExprCode],
           inputTypes: Seq[DataType],
    -      bufferHolder: String): String = {
    +      bufferHolder: String,
    +      isTopLevel: Boolean = false): String = {
    +    val rowWriterClass = classOf[UnsafeRowWriter].getName
         val rowWriter = ctx.freshName("rowWriter")
    -    ctx.addMutableState(rowWriterClass, rowWriter, s"this.$rowWriter = new $rowWriterClass();")
    +    ctx.addMutableState(rowWriterClass, rowWriter,
    +      s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
    +
    +    val resetWriter = if (isTopLevel) {
    +      // For top level row writer, it always writes to the beginning of the global buffer holder,
    +      // which means its fixed-size region always in the same position, so we don't need to call
    +      // `reset` to set up its fixed-size region every time.
    +      if (inputs.map(_.isNull).forall(_ == "false")) {
    --- End diff --
    
    Could we pass in the expressions ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172612163
  
    **[Test build #49602 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49602/consoleFull)** for PR 10809 at commit [`3978711`](https://github.com/apache/spark/commit/397871117179c31c5a634c96165e8cf934316291).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172609968
  
    cc @davies @nongli 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172644896
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49606/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50174584
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java ---
    @@ -26,36 +26,44 @@
     import org.apache.spark.unsafe.types.UTF8String;
     
     /**
    - * A helper class to write data into global row buffer using `UnsafeRow` format,
    - * used by {@link org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
    + * A helper class to write data into global row buffer using `UnsafeRow` format.
    + *
    + * It will remember the offset of row buffer which it starts to write, and move the cursor of row
    + * buffer while writing.  If a new record comes, the cursor of row buffer will be reset, so we need
    --- End diff --
    
    the record that this writer is responsible to write, it can be the whole row record, or a nested struct, or even a struct type element in array.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50034304
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java ---
    @@ -26,36 +26,44 @@
     import org.apache.spark.unsafe.types.UTF8String;
     
     /**
    - * A helper class to write data into global row buffer using `UnsafeRow` format,
    - * used by {@link org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
    + * A helper class to write data into global row buffer using `UnsafeRow` format.
    + *
    + * It will remember the offset of row buffer which it starts to write, and move the cursor of row
    + * buffer while writing.  If a new record comes, the cursor of row buffer will be reset, so we need
    + * to also call `reset` of this class before writing, to update the `startingOffset` and clear out
    + * null bits.  Note that if we use it to write data into the result unsafe row, which means we will
    + * always write from the very beginning of the global row buffer, we don't need to update
    + * `startingOffset` and can just call `zeroOutNullBites` before writing new record.
      */
     public class UnsafeRowWriter {
     
    -  private BufferHolder holder;
    +  private final BufferHolder holder;
       // The offset of the global buffer where we start to write this row.
       private int startingOffset;
    -  private int nullBitsSize;
    -  private UnsafeRow row;
    +  private final int nullBitsSize;
    +  private final int fixedSize;
     
    -  public void initialize(BufferHolder holder, int numFields) {
    -    this.holder = holder;
    +  public void reset() {
         this.startingOffset = holder.cursor;
    -    this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
     
         // grow the global buffer to make sure it has enough space to write fixed-length data.
    -    final int fixedSize = nullBitsSize + 8 * numFields;
    -    holder.grow(fixedSize, row);
    +    holder.grow(fixedSize);
         holder.cursor += fixedSize;
     
    -    // zero-out the null bits region
    +    zeroOutNullBites();
    +  }
    +
    +  public void zeroOutNullBites() {
         for (int i = 0; i < nullBitsSize; i += 8) {
           Platform.putLong(holder.buffer, startingOffset + i, 0L);
         }
       }
     
    -  public void initialize(UnsafeRow row, BufferHolder holder, int numFields) {
    -    initialize(holder, numFields);
    -    this.row = row;
    +  public UnsafeRowWriter(BufferHolder holder, int numFields) {
    --- End diff --
    
    can you remove numFields. The holder contains a row and there is only one valid value for numFields. Better to pull it from holder.row


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-174699755
  
    **[Test build #50026 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50026/consoleFull)** for PR 10809 at commit [`f79f63c`](https://github.com/apache/spark/commit/f79f63cbead61f561e34270154f236dcae786e16).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-174739455
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50769891
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala ---
    @@ -73,9 +70,27 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
           row: String,
           inputs: Seq[ExprCode],
           inputTypes: Seq[DataType],
    -      bufferHolder: String): String = {
    +      bufferHolder: String,
    +      isTopLevel: Boolean = false): String = {
    +    val rowWriterClass = classOf[UnsafeRowWriter].getName
         val rowWriter = ctx.freshName("rowWriter")
    -    ctx.addMutableState(rowWriterClass, rowWriter, s"this.$rowWriter = new $rowWriterClass();")
    +    ctx.addMutableState(rowWriterClass, rowWriter,
    +      s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
    +
    +    val resetWriter = if (isTopLevel) {
    +      // For top level row writer, it always writes to the beginning of the global buffer holder,
    +      // which means its fixed-size region always in the same position, so we don't need to call
    +      // `reset` to set up its fixed-size region every time.
    +      if (inputs.map(_.isNull).forall(_ == "false")) {
    --- End diff --
    
    I followed https://github.com/apache/spark/pull/10333/files#diff-90b107e5c61791e17d5b4b25021b89fdR138 to do this, is there a better approach?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172722668
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172694268
  
    **[Test build #49647 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49647/consoleFull)** for PR 10809 at commit [`5567ef1`](https://github.com/apache/spark/commit/5567ef155f5661de2f0b1e8be65540e1cc35cea4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50178239
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java ---
    @@ -26,36 +26,44 @@
     import org.apache.spark.unsafe.types.UTF8String;
     
     /**
    - * A helper class to write data into global row buffer using `UnsafeRow` format,
    - * used by {@link org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
    + * A helper class to write data into global row buffer using `UnsafeRow` format.
    + *
    + * It will remember the offset of row buffer which it starts to write, and move the cursor of row
    + * buffer while writing.  If a new record comes, the cursor of row buffer will be reset, so we need
    + * to also call `reset` of this class before writing, to update the `startingOffset` and clear out
    + * null bits.  Note that if we use it to write data into the result unsafe row, which means we will
    + * always write from the very beginning of the global row buffer, we don't need to update
    + * `startingOffset` and can just call `zeroOutNullBites` before writing new record.
      */
     public class UnsafeRowWriter {
     
    -  private BufferHolder holder;
    +  private final BufferHolder holder;
       // The offset of the global buffer where we start to write this row.
       private int startingOffset;
    -  private int nullBitsSize;
    -  private UnsafeRow row;
    +  private final int nullBitsSize;
    +  private final int fixedSize;
     
    -  public void initialize(BufferHolder holder, int numFields) {
    -    this.holder = holder;
    +  public void reset() {
         this.startingOffset = holder.cursor;
    -    this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
     
         // grow the global buffer to make sure it has enough space to write fixed-length data.
    -    final int fixedSize = nullBitsSize + 8 * numFields;
    -    holder.grow(fixedSize, row);
    +    holder.grow(fixedSize);
         holder.cursor += fixedSize;
     
    -    // zero-out the null bits region
    +    zeroOutNullBites();
    +  }
    +
    +  public void zeroOutNullBites() {
         for (int i = 0; i < nullBitsSize; i += 8) {
           Platform.putLong(holder.buffer, startingOffset + i, 0L);
         }
       }
     
    -  public void initialize(UnsafeRow row, BufferHolder holder, int numFields) {
    -    initialize(holder, numFields);
    -    this.row = row;
    +  public UnsafeRowWriter(BufferHolder holder, int numFields) {
    --- End diff --
    
    we only new it once, and call `zeroOutNullBytes`/`reset` every round for result row/nested struct


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172722331
  
    **[Test build #49647 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49647/consoleFull)** for PR 10809 at commit [`5567ef1`](https://github.com/apache/spark/commit/5567ef155f5661de2f0b1e8be65540e1cc35cea4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172612294
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50034917
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.RowEncoder
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.types._
    +import org.apache.spark.util.Benchmark
    +
    +/**
    + * Benchmark [[UnsafeProjection]] for flat schema(primitive-type fields).
    + */
    +object UnsafeProjectionBenchmark {
    +
    +  def generateRows(schema: StructType, numRows: Int): Array[InternalRow] = {
    +    val generator = RandomDataGenerator.forType(schema, nullable = false).get
    +    val encoder = RowEncoder(schema)
    +    (1 to numRows).map(_ => encoder.toRow(generator().asInstanceOf[Row]).copy()).toArray
    +  }
    +
    +  def main(args: Array[String]) {
    +    val iters = 1024 * 16
    +    val numRows = 1024 * 16
    +
    +    val benchmark = new Benchmark("unsafe projection", iters * numRows)
    +
    +
    +    val schema1 = new StructType().add("l", LongType, false)
    +    val attrs1 = schema1.toAttributes
    +    val rows1 = generateRows(schema1, numRows)
    +    val projection1 = UnsafeProjection.create(attrs1, attrs1)
    +
    +    benchmark.addCase("single long") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection1(rows1(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +    val schema2 = new StructType().add("l", LongType, true)
    +    val attrs2 = schema2.toAttributes
    +    val rows2 = generateRows(schema2, numRows)
    +    val projection2 = UnsafeProjection.create(attrs2, attrs2)
    +
    +    benchmark.addCase("single nullable long") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection2(rows2(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    val schema3 = new StructType()
    +      .add("boolean", BooleanType, false)
    +      .add("byte", ByteType, false)
    +      .add("short", ShortType, false)
    +      .add("int", IntegerType, false)
    +      .add("long", LongType, false)
    +      .add("float", FloatType, false)
    +      .add("double", DoubleType, false)
    +    val attrs3 = schema3.toAttributes
    +    val rows3 = generateRows(schema3, numRows)
    +    val projection3 = UnsafeProjection.create(attrs3, attrs3)
    +
    +    benchmark.addCase("primitive types") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection3(rows3(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    val schema4 = new StructType()
    +      .add("boolean", BooleanType, true)
    +      .add("byte", ByteType, true)
    +      .add("short", ShortType, true)
    +      .add("int", IntegerType, true)
    +      .add("long", LongType, true)
    +      .add("float", FloatType, true)
    +      .add("double", DoubleType, true)
    +    val attrs4 = schema4.toAttributes
    +    val rows4 = generateRows(schema4, numRows)
    +    val projection4 = UnsafeProjection.create(attrs4, attrs4)
    +
    +    benchmark.addCase("nullable primitive types") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection4(rows4(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    benchmark.run()
    --- End diff --
    
    can you paste the results here as a comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172612298
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49602/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50174588
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.RowEncoder
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.types._
    +import org.apache.spark.util.Benchmark
    +
    +/**
    + * Benchmark [[UnsafeProjection]] for flat schema(primitive-type fields).
    + */
    +object UnsafeProjectionBenchmark {
    +
    +  def generateRows(schema: StructType, numRows: Int): Array[InternalRow] = {
    +    val generator = RandomDataGenerator.forType(schema, nullable = false).get
    +    val encoder = RowEncoder(schema)
    +    (1 to numRows).map(_ => encoder.toRow(generator().asInstanceOf[Row]).copy()).toArray
    +  }
    +
    +  def main(args: Array[String]) {
    +    val iters = 1024 * 16
    +    val numRows = 1024 * 16
    +
    +    val benchmark = new Benchmark("unsafe projection", iters * numRows)
    +
    +
    +    val schema1 = new StructType().add("l", LongType, false)
    +    val attrs1 = schema1.toAttributes
    +    val rows1 = generateRows(schema1, numRows)
    +    val projection1 = UnsafeProjection.create(attrs1, attrs1)
    +
    +    benchmark.addCase("single long") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection1(rows1(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +    val schema2 = new StructType().add("l", LongType, true)
    +    val attrs2 = schema2.toAttributes
    +    val rows2 = generateRows(schema2, numRows)
    +    val projection2 = UnsafeProjection.create(attrs2, attrs2)
    +
    +    benchmark.addCase("single nullable long") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection2(rows2(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    val schema3 = new StructType()
    +      .add("boolean", BooleanType, false)
    +      .add("byte", ByteType, false)
    +      .add("short", ShortType, false)
    +      .add("int", IntegerType, false)
    +      .add("long", LongType, false)
    +      .add("float", FloatType, false)
    +      .add("double", DoubleType, false)
    +    val attrs3 = schema3.toAttributes
    +    val rows3 = generateRows(schema3, numRows)
    +    val projection3 = UnsafeProjection.create(attrs3, attrs3)
    +
    +    benchmark.addCase("primitive types") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection3(rows3(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    val schema4 = new StructType()
    +      .add("boolean", BooleanType, true)
    +      .add("byte", ByteType, true)
    +      .add("short", ShortType, true)
    +      .add("int", IntegerType, true)
    +      .add("long", LongType, true)
    +      .add("float", FloatType, true)
    +      .add("double", DoubleType, true)
    +    val attrs4 = schema4.toAttributes
    +    val rows4 = generateRows(schema4, numRows)
    +    val projection4 = UnsafeProjection.create(attrs4, attrs4)
    +
    +    benchmark.addCase("nullable primitive types") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection4(rows4(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    benchmark.run()
    --- End diff --
    
    + for posting it here, it will be easy to track the performance improvements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50029755
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala ---
    @@ -0,0 +1,107 @@
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.RowEncoder
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.types._
    +import org.apache.spark.util.Benchmark
    +
    +object UnsafeProjectionBenchmark {
    --- End diff --
    
    Should we keep benchmark code in PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50172508
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java ---
    @@ -26,36 +26,44 @@
     import org.apache.spark.unsafe.types.UTF8String;
     
     /**
    - * A helper class to write data into global row buffer using `UnsafeRow` format,
    - * used by {@link org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
    + * A helper class to write data into global row buffer using `UnsafeRow` format.
    + *
    + * It will remember the offset of row buffer which it starts to write, and move the cursor of row
    + * buffer while writing.  If a new record comes, the cursor of row buffer will be reset, so we need
    + * to also call `reset` of this class before writing, to update the `startingOffset` and clear out
    + * null bits.  Note that if we use it to write data into the result unsafe row, which means we will
    + * always write from the very beginning of the global row buffer, we don't need to update
    + * `startingOffset` and can just call `zeroOutNullBites` before writing new record.
      */
     public class UnsafeRowWriter {
     
    -  private BufferHolder holder;
    +  private final BufferHolder holder;
       // The offset of the global buffer where we start to write this row.
       private int startingOffset;
    -  private int nullBitsSize;
    -  private UnsafeRow row;
    +  private final int nullBitsSize;
    +  private final int fixedSize;
     
    -  public void initialize(BufferHolder holder, int numFields) {
    -    this.holder = holder;
    +  public void reset() {
         this.startingOffset = holder.cursor;
    -    this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
     
         // grow the global buffer to make sure it has enough space to write fixed-length data.
    -    final int fixedSize = nullBitsSize + 8 * numFields;
    -    holder.grow(fixedSize, row);
    +    holder.grow(fixedSize);
         holder.cursor += fixedSize;
     
    -    // zero-out the null bits region
    +    zeroOutNullBites();
    +  }
    +
    +  public void zeroOutNullBites() {
         for (int i = 0; i < nullBitsSize; i += 8) {
           Platform.putLong(holder.buffer, startingOffset + i, 0L);
         }
       }
     
    -  public void initialize(UnsafeRow row, BufferHolder holder, int numFields) {
    -    initialize(holder, numFields);
    -    this.row = row;
    +  public UnsafeRowWriter(BufferHolder holder, int numFields) {
    --- End diff --
    
    Could you move this constructor to the beginning?
    
    What's the expected calling order?
    
    For UnsafeRow, `new UnsafeRowWriter(holder, n).zeroOutNullBytes()`
    For nested struct, `new UnsafeRowWriter(holder, n).reset()`
    
    Is that correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50036589
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.RowEncoder
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.types._
    +import org.apache.spark.util.Benchmark
    +
    +/**
    + * Benchmark [[UnsafeProjection]] for flat schema(primitive-type fields).
    + */
    +object UnsafeProjectionBenchmark {
    +
    +  def generateRows(schema: StructType, numRows: Int): Array[InternalRow] = {
    +    val generator = RandomDataGenerator.forType(schema, nullable = false).get
    +    val encoder = RowEncoder(schema)
    +    (1 to numRows).map(_ => encoder.toRow(generator().asInstanceOf[Row]).copy()).toArray
    +  }
    +
    +  def main(args: Array[String]) {
    +    val iters = 1024 * 16
    +    val numRows = 1024 * 16
    +
    +    val benchmark = new Benchmark("unsafe projection", iters * numRows)
    +
    +
    +    val schema1 = new StructType().add("l", LongType, false)
    +    val attrs1 = schema1.toAttributes
    +    val rows1 = generateRows(schema1, numRows)
    +    val projection1 = UnsafeProjection.create(attrs1, attrs1)
    +
    +    benchmark.addCase("single long") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection1(rows1(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +    val schema2 = new StructType().add("l", LongType, true)
    +    val attrs2 = schema2.toAttributes
    +    val rows2 = generateRows(schema2, numRows)
    +    val projection2 = UnsafeProjection.create(attrs2, attrs2)
    +
    +    benchmark.addCase("single nullable long") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection2(rows2(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    val schema3 = new StructType()
    +      .add("boolean", BooleanType, false)
    +      .add("byte", ByteType, false)
    +      .add("short", ShortType, false)
    +      .add("int", IntegerType, false)
    +      .add("long", LongType, false)
    +      .add("float", FloatType, false)
    +      .add("double", DoubleType, false)
    +    val attrs3 = schema3.toAttributes
    +    val rows3 = generateRows(schema3, numRows)
    +    val projection3 = UnsafeProjection.create(attrs3, attrs3)
    +
    +    benchmark.addCase("primitive types") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection3(rows3(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    val schema4 = new StructType()
    +      .add("boolean", BooleanType, true)
    +      .add("byte", ByteType, true)
    +      .add("short", ShortType, true)
    +      .add("int", IntegerType, true)
    +      .add("long", LongType, true)
    +      .add("float", FloatType, true)
    +      .add("double", DoubleType, true)
    +    val attrs4 = schema4.toAttributes
    +    val rows4 = generateRows(schema4, numRows)
    +    val projection4 = UnsafeProjection.create(attrs4, attrs4)
    +
    +    benchmark.addCase("nullable primitive types") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection4(rows4(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    benchmark.run()
    --- End diff --
    
    just put the result of new version? Actually I copy and paste this benchmark to master and run, so that I can see how much this PR speeds up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50173654
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala ---
    @@ -288,22 +299,43 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
         val exprEvals = ctx.generateExpressions(expressions, useSubexprElimination)
         val exprTypes = expressions.map(_.dataType)
     
    +    val numVarLenFields = exprTypes.count {
    --- End diff --
    
    Since it's easy to grow the buffer, we don't need these optimization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50172662
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java ---
    @@ -21,24 +21,36 @@
     import org.apache.spark.unsafe.Platform;
     
     /**
    - * A helper class to manage the row buffer when construct unsafe rows.
    + * A helper class to manage the data buffer for an unsafe row.  The data buffer can grow and
    + * automatically re-point the unsafe row to it.
    + *
    + * This class can be used to build a one-pass unsafe row writing program, i.e. data will be written
    + * to the data buffer directly and no extra copy is needed.  There should be only one instance of
    + * this class per writing program, so that the memory segment/data buffer can be reused.  Note that
    + * for each incoming record, we should call `reset` of BufferHolder instance before write the record
    + * and reuse the data buffer.
    --- End diff --
    
    Could you also comment that we should either call `unsafeRow.pointTo()` or `unsafeRow.setTotalSize()` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50061940
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala ---
    @@ -73,9 +70,27 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
           row: String,
           inputs: Seq[ExprCode],
           inputTypes: Seq[DataType],
    -      bufferHolder: String): String = {
    +      bufferHolder: String,
    +      isTopLevel: Boolean = false): String = {
    +    val rowWriterClass = classOf[UnsafeRowWriter].getName
         val rowWriter = ctx.freshName("rowWriter")
    -    ctx.addMutableState(rowWriterClass, rowWriter, s"this.$rowWriter = new $rowWriterClass();")
    +    ctx.addMutableState(rowWriterClass, rowWriter,
    +      s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
    +
    +    val resetWriter = if (isTopLevel) {
    +      // For top level row writer, it always writes to the beginning of the global buffer holder,
    +      // which means its fixed-size region always in the same position, so we don't need to call
    +      // `reset` to set up its fixed-size region every time.
    +      if (inputs.map(_.isNull).forall(_ == "false")) {
    +        // If all fields are not nullable, which means the null bits never changes, then we don't
    +        // need to clear it out every time.
    +        ""
    +      } else {
    +        s"$rowWriter.zeroOutNullBytes();"
    --- End diff --
    
    Here I made a different decision compare to the unsafe parquet reader.  We can clear out the null bits at beginning, and call `UnsafeRowWriter.write` instead of `UnsafeRow.setXXX`, which saves one null bits updating.  If null values are rare, this one should be faster.  I'll benchmark it later.
    cc @nongli 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50175287
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java ---
    @@ -26,36 +26,44 @@
     import org.apache.spark.unsafe.types.UTF8String;
     
     /**
    - * A helper class to write data into global row buffer using `UnsafeRow` format,
    - * used by {@link org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
    + * A helper class to write data into global row buffer using `UnsafeRow` format.
    + *
    + * It will remember the offset of row buffer which it starts to write, and move the cursor of row
    + * buffer while writing.  If a new record comes, the cursor of row buffer will be reset, so we need
    --- End diff --
    
    I mean the `new record` is not clear to me, it should be nested struct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50174505
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.RowEncoder
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.types._
    +import org.apache.spark.util.Benchmark
    +
    +/**
    + * Benchmark [[UnsafeProjection]] for flat schema(primitive-type fields).
    + */
    +object UnsafeProjectionBenchmark {
    +
    +  def generateRows(schema: StructType, numRows: Int): Array[InternalRow] = {
    +    val generator = RandomDataGenerator.forType(schema, nullable = false).get
    +    val encoder = RowEncoder(schema)
    +    (1 to numRows).map(_ => encoder.toRow(generator().asInstanceOf[Row]).copy()).toArray
    +  }
    +
    +  def main(args: Array[String]) {
    +    val iters = 1024 * 16
    +    val numRows = 1024 * 16
    +
    +    val benchmark = new Benchmark("unsafe projection", iters * numRows)
    +
    +
    +    val schema1 = new StructType().add("l", LongType, false)
    +    val attrs1 = schema1.toAttributes
    +    val rows1 = generateRows(schema1, numRows)
    +    val projection1 = UnsafeProjection.create(attrs1, attrs1)
    +
    +    benchmark.addCase("single long") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection1(rows1(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +    val schema2 = new StructType().add("l", LongType, true)
    +    val attrs2 = schema2.toAttributes
    +    val rows2 = generateRows(schema2, numRows)
    +    val projection2 = UnsafeProjection.create(attrs2, attrs2)
    +
    +    benchmark.addCase("single nullable long") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection2(rows2(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    val schema3 = new StructType()
    +      .add("boolean", BooleanType, false)
    +      .add("byte", ByteType, false)
    +      .add("short", ShortType, false)
    +      .add("int", IntegerType, false)
    +      .add("long", LongType, false)
    +      .add("float", FloatType, false)
    +      .add("double", DoubleType, false)
    +    val attrs3 = schema3.toAttributes
    +    val rows3 = generateRows(schema3, numRows)
    +    val projection3 = UnsafeProjection.create(attrs3, attrs3)
    +
    +    benchmark.addCase("primitive types") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection3(rows3(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    val schema4 = new StructType()
    +      .add("boolean", BooleanType, true)
    +      .add("byte", ByteType, true)
    +      .add("short", ShortType, true)
    +      .add("int", IntegerType, true)
    +      .add("long", LongType, true)
    +      .add("float", FloatType, true)
    +      .add("double", DoubleType, true)
    +    val attrs4 = schema4.toAttributes
    +    val rows4 = generateRows(schema4, numRows)
    +    val projection4 = UnsafeProjection.create(attrs4, attrs4)
    +
    +    benchmark.addCase("nullable primitive types") { _ =>
    --- End diff --
    
    Could you say `7 primitive types`, or it's weird to see that why primitive types is so slow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-174738987
  
    **[Test build #50026 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50026/consoleFull)** for PR 10809 at commit [`f79f63c`](https://github.com/apache/spark/commit/f79f63cbead61f561e34270154f236dcae786e16).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172612291
  
    **[Test build #49602 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49602/consoleFull)** for PR 10809 at commit [`3978711`](https://github.com/apache/spark/commit/397871117179c31c5a634c96165e8cf934316291).
     * This patch **fails RAT tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50034654
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala ---
    @@ -288,22 +299,44 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
         val exprEvals = ctx.generateExpressions(expressions, useSubexprElimination)
         val exprTypes = expressions.map(_.dataType)
     
    +    val numVarLenFields = exprTypes.count {
    +      case dt if ctx.isPrimitiveType(dt) => false
    --- End diff --
    
    can you use UnsafeRow.isFixedLength here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172722670
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49647/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172644894
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50157896
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java ---
    @@ -26,36 +26,44 @@
     import org.apache.spark.unsafe.types.UTF8String;
     
     /**
    - * A helper class to write data into global row buffer using `UnsafeRow` format,
    - * used by {@link org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
    + * A helper class to write data into global row buffer using `UnsafeRow` format.
    + *
    + * It will remember the offset of row buffer which it starts to write, and move the cursor of row
    + * buffer while writing.  If a new record comes, the cursor of row buffer will be reset, so we need
    + * to also call `reset` of this class before writing, to update the `startingOffset` and clear out
    + * null bits.  Note that if we use it to write data into the result unsafe row, which means we will
    + * always write from the very beginning of the global row buffer, we don't need to update
    + * `startingOffset` and can just call `zeroOutNullBites` before writing new record.
      */
     public class UnsafeRowWriter {
     
    -  private BufferHolder holder;
    +  private final BufferHolder holder;
       // The offset of the global buffer where we start to write this row.
       private int startingOffset;
    -  private int nullBitsSize;
    -  private UnsafeRow row;
    +  private final int nullBitsSize;
    +  private final int fixedSize;
     
    -  public void initialize(BufferHolder holder, int numFields) {
    -    this.holder = holder;
    +  public void reset() {
         this.startingOffset = holder.cursor;
    -    this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
     
         // grow the global buffer to make sure it has enough space to write fixed-length data.
    -    final int fixedSize = nullBitsSize + 8 * numFields;
    -    holder.grow(fixedSize, row);
    +    holder.grow(fixedSize);
         holder.cursor += fixedSize;
     
    -    // zero-out the null bits region
    +    zeroOutNullBites();
    +  }
    +
    +  public void zeroOutNullBites() {
         for (int i = 0; i < nullBitsSize; i += 8) {
           Platform.putLong(holder.buffer, startingOffset + i, 0L);
         }
       }
     
    -  public void initialize(UnsafeRow row, BufferHolder holder, int numFields) {
    -    initialize(holder, numFields);
    -    this.row = row;
    +  public UnsafeRowWriter(BufferHolder holder, int numFields) {
    --- End diff --
    
    Hmm I see. Please comment that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50158067
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.RowEncoder
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.types._
    +import org.apache.spark.util.Benchmark
    +
    +/**
    + * Benchmark [[UnsafeProjection]] for flat schema(primitive-type fields).
    + */
    +object UnsafeProjectionBenchmark {
    +
    +  def generateRows(schema: StructType, numRows: Int): Array[InternalRow] = {
    +    val generator = RandomDataGenerator.forType(schema, nullable = false).get
    +    val encoder = RowEncoder(schema)
    +    (1 to numRows).map(_ => encoder.toRow(generator().asInstanceOf[Row]).copy()).toArray
    +  }
    +
    +  def main(args: Array[String]) {
    +    val iters = 1024 * 16
    +    val numRows = 1024 * 16
    +
    +    val benchmark = new Benchmark("unsafe projection", iters * numRows)
    +
    +
    +    val schema1 = new StructType().add("l", LongType, false)
    +    val attrs1 = schema1.toAttributes
    +    val rows1 = generateRows(schema1, numRows)
    +    val projection1 = UnsafeProjection.create(attrs1, attrs1)
    +
    +    benchmark.addCase("single long") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection1(rows1(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +    val schema2 = new StructType().add("l", LongType, true)
    +    val attrs2 = schema2.toAttributes
    +    val rows2 = generateRows(schema2, numRows)
    +    val projection2 = UnsafeProjection.create(attrs2, attrs2)
    +
    +    benchmark.addCase("single nullable long") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection2(rows2(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    val schema3 = new StructType()
    +      .add("boolean", BooleanType, false)
    +      .add("byte", ByteType, false)
    +      .add("short", ShortType, false)
    +      .add("int", IntegerType, false)
    +      .add("long", LongType, false)
    +      .add("float", FloatType, false)
    +      .add("double", DoubleType, false)
    +    val attrs3 = schema3.toAttributes
    +    val rows3 = generateRows(schema3, numRows)
    +    val projection3 = UnsafeProjection.create(attrs3, attrs3)
    +
    +    benchmark.addCase("primitive types") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection3(rows3(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    val schema4 = new StructType()
    +      .add("boolean", BooleanType, true)
    +      .add("byte", ByteType, true)
    +      .add("short", ShortType, true)
    +      .add("int", IntegerType, true)
    +      .add("long", LongType, true)
    +      .add("float", FloatType, true)
    +      .add("double", DoubleType, true)
    +    val attrs4 = schema4.toAttributes
    +    val rows4 = generateRows(schema4, numRows)
    +    val projection4 = UnsafeProjection.create(attrs4, attrs4)
    +
    +    benchmark.addCase("nullable primitive types") { _ =>
    +      for (_ <- 1 to iters) {
    +        var sum = 0L
    +        var i = 0
    +        while (i < numRows) {
    +          sum += projection4(rows4(i)).getLong(0)
    +          i += 1
    +        }
    +      }
    +    }
    +
    +
    +    benchmark.run()
    --- End diff --
    
    Just the new table from the PR comment is good. I like to include these so readers get a rough sense of what is happening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-172954219
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10809#issuecomment-174744019
  
    LGTM, merging this into master, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50033878
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java ---
    @@ -26,36 +26,44 @@
     import org.apache.spark.unsafe.types.UTF8String;
     
     /**
    - * A helper class to write data into global row buffer using `UnsafeRow` format,
    - * used by {@link org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
    + * A helper class to write data into global row buffer using `UnsafeRow` format.
    + *
    + * It will remember the offset of row buffer which it starts to write, and move the cursor of row
    + * buffer while writing.  If a new record comes, the cursor of row buffer will be reset, so we need
    + * to also call `reset` of this class before writing, to update the `startingOffset` and clear out
    + * null bits.  Note that if we use it to write data into the result unsafe row, which means we will
    + * always write from the very beginning of the global row buffer, we don't need to update
    + * `startingOffset` and can just call `zeroOutNullBites` before writing new record.
      */
     public class UnsafeRowWriter {
     
    -  private BufferHolder holder;
    +  private final BufferHolder holder;
       // The offset of the global buffer where we start to write this row.
       private int startingOffset;
    -  private int nullBitsSize;
    -  private UnsafeRow row;
    +  private final int nullBitsSize;
    +  private final int fixedSize;
     
    -  public void initialize(BufferHolder holder, int numFields) {
    -    this.holder = holder;
    +  public void reset() {
         this.startingOffset = holder.cursor;
    -    this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
     
         // grow the global buffer to make sure it has enough space to write fixed-length data.
    -    final int fixedSize = nullBitsSize + 8 * numFields;
    -    holder.grow(fixedSize, row);
    +    holder.grow(fixedSize);
         holder.cursor += fixedSize;
     
    -    // zero-out the null bits region
    +    zeroOutNullBites();
    +  }
    +
    +  public void zeroOutNullBites() {
    --- End diff --
    
    NullBytes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50172102
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java ---
    @@ -26,36 +26,44 @@
     import org.apache.spark.unsafe.types.UTF8String;
     
     /**
    - * A helper class to write data into global row buffer using `UnsafeRow` format,
    - * used by {@link org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
    + * A helper class to write data into global row buffer using `UnsafeRow` format.
    + *
    + * It will remember the offset of row buffer which it starts to write, and move the cursor of row
    + * buffer while writing.  If a new record comes, the cursor of row buffer will be reset, so we need
    --- End diff --
    
    This `new record` mean nested struct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12879][SQL] improve the unsafe row writ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10809#discussion_r50172943
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala ---
    @@ -73,9 +70,27 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
           row: String,
           inputs: Seq[ExprCode],
           inputTypes: Seq[DataType],
    -      bufferHolder: String): String = {
    +      bufferHolder: String,
    +      isTopLevel: Boolean = false): String = {
    +    val rowWriterClass = classOf[UnsafeRowWriter].getName
         val rowWriter = ctx.freshName("rowWriter")
    -    ctx.addMutableState(rowWriterClass, rowWriter, s"this.$rowWriter = new $rowWriterClass();")
    +    ctx.addMutableState(rowWriterClass, rowWriter,
    +      s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
    +
    +    val resetWriter = if (isTopLevel) {
    +      // For top level row writer, it always writes to the beginning of the global buffer holder,
    +      // which means its fixed-size region always in the same position, so we don't need to call
    +      // `reset` to set up its fixed-size region every time.
    +      if (inputs.map(_.isNull).forall(_ == "false")) {
    --- End diff --
    
    Even the expression is not nullable, `isNull` could still be not `false` (not optimized yet).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org