You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by marmbrus <gi...@git.apache.org> on 2014/08/10 23:22:06 UTC

[GitHub] spark pull request: [SPARK-2650] Build column buffers in smaller b...

GitHub user marmbrus opened a pull request:

    https://github.com/apache/spark/pull/1880

    [SPARK-2650] Build column buffers in smaller batches

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/marmbrus/spark columnBatches

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1880.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1880
    
----
commit 23145329d6970ddbdf97c75d13e0a393df6d4747
Author: Michael Armbrust <mi...@databricks.com>
Date:   2014-08-10T21:21:10Z

    Build column buffers in smaller batches

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1880#issuecomment-51728104
  
    QA tests have started for PR 1880. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18286/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1880#discussion_r16039955
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---
    @@ -90,22 +101,31 @@ private[sql] case class InMemoryColumnarTableScan(
     
       override def execute() = {
         relation.cachedColumnBuffers.mapPartitions { iterator =>
    -      val columnBuffers = iterator.next()
    -      assert(!iterator.hasNext)
    +      // Find the ordinals of the requested columns.  If none are requested, use the first.
    +      val requestedColumns =
    +        if (attributes.isEmpty) {
    +          Seq(0)
    +        } else {
    +          attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
    +        }
     
           new Iterator[Row] {
    -        // Find the ordinals of the requested columns.  If none are requested, use the first.
    -        val requestedColumns =
    -          if (attributes.isEmpty) {
    -            Seq(0)
    -          } else {
    -            attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
    -          }
    +        private[this] var columnBuffers: Array[ByteBuffer] = null
    +        private[this] var columnAccessors: Seq[ColumnAccessor] = null
    +        nextBatch()
    --- End diff --
    
    @marmbrus @liancheng It's great to see we will support the smaller batch processing in `InMemoryRelation`, but Ideally, we can rewind the bytebuffers in the beginning of each iteration in `InMemoryRelation` during the  runtime, the whole detail should be transparent to the iterator in `InMemoryColumnarTableScan`. In this way, I think we can keep most of the code unchanged in `InMemoryColumnarTableScan`  then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1880#issuecomment-51730000
  
    QA results for PR 1880:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18286/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/1880#issuecomment-51737955
  
    Ah, just realized I made things too complex... Just use `columnType.defaultSize * batchSize` as the initial column buffer size, it's equivalent to the verbose version above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1880#issuecomment-51730127
  
    QA tests have started for PR 1880. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18288/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1880#discussion_r16035082
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---
    @@ -90,22 +101,31 @@ private[sql] case class InMemoryColumnarTableScan(
     
       override def execute() = {
         relation.cachedColumnBuffers.mapPartitions { iterator =>
    -      val columnBuffers = iterator.next()
    -      assert(!iterator.hasNext)
    +      // Find the ordinals of the requested columns.  If none are requested, use the first.
    +      val requestedColumns =
    +        if (attributes.isEmpty) {
    +          Seq(0)
    --- End diff --
    
    Maybe we can use the narrowest one instead of the 1st one by checking default sizes of columns:
    
    ```scala
    val narrowest = relation.output.indices.minBy { i =>
      ColumnType(relation.output(i).dataType).defaultSize
    }
    Seq(narrowest)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/1880#issuecomment-51877140
  
    Opened #1901 for precise initial buffer size estimation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1880#discussion_r16153219
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---
    @@ -90,22 +101,31 @@ private[sql] case class InMemoryColumnarTableScan(
     
       override def execute() = {
         relation.cachedColumnBuffers.mapPartitions { iterator =>
    -      val columnBuffers = iterator.next()
    -      assert(!iterator.hasNext)
    +      // Find the ordinals of the requested columns.  If none are requested, use the first.
    +      val requestedColumns =
    +        if (attributes.isEmpty) {
    +          Seq(0)
    +        } else {
    +          attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
    +        }
     
           new Iterator[Row] {
    -        // Find the ordinals of the requested columns.  If none are requested, use the first.
    -        val requestedColumns =
    -          if (attributes.isEmpty) {
    -            Seq(0)
    -          } else {
    -            attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
    -          }
    +        private[this] var columnBuffers: Array[ByteBuffer] = null
    +        private[this] var columnAccessors: Seq[ColumnAccessor] = null
    +        nextBatch()
    --- End diff --
    
    Yes, @liancheng , you're right. Sorry, I didn't make it clearer, I will create another PR for this. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1880#discussion_r16036262
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---
    @@ -90,22 +101,31 @@ private[sql] case class InMemoryColumnarTableScan(
     
       override def execute() = {
         relation.cachedColumnBuffers.mapPartitions { iterator =>
    -      val columnBuffers = iterator.next()
    -      assert(!iterator.hasNext)
    +      // Find the ordinals of the requested columns.  If none are requested, use the first.
    +      val requestedColumns =
    +        if (attributes.isEmpty) {
    +          Seq(0)
    --- End diff --
    
    Yeah, that would be better.  Really though I think we should use statistics from #1883 to skip decoding entirely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1880#issuecomment-51731948
  
    QA results for PR 1880:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18288/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1880#issuecomment-51727923
  
    QA results for PR 1880:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18285/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/1880#issuecomment-51737798
  
    I believe this PR can alleviate OOMs a lot. Below are some ideas to make in-memory columnar store more memory efficient, and can be done in separate PRs based on this one.
    
    While building column buffers in batch, we still uses 1MB as initial column buffer size for *each* column (defined as `ColumnBuilder.DEFAULT_INITIAL_BUFFER_SIZE`). Say T tasks are running in parallel to squeeze a table with C columns into memory, we allocate at least T * C * 1MB for each batch.
    
    The initial column buffer size estimation used in Shark can be useful, but unfortunately the implementation is actually buggy, and usually gives fairly small initial buffer size. A more reasonable estimation heuristics could be:
    
    1. Let `D[i]` be the default size of the `i`-th column
    1. Let `I = sum(D[i]) * batchSize`
    1. Default column buffer size for the `i`-th column is `S[i] = I * D[i] / sum(D[i])`
    
    This estimation is precise for all primitive types whose default sizes equals to their actual sizes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650] Build column buffers in smaller b...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1880#issuecomment-51727834
  
    QA tests have started for PR 1880. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18285/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/1880#issuecomment-51869212
  
    Merged to master and 1.1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1880#discussion_r16036330
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---
    @@ -43,22 +44,31 @@ private[sql] case class InMemoryRelation(
       // As in Spark, the actual work of caching is lazy.
       if (_cachedColumnBuffers == null) {
         val output = child.output
    -    val cached = child.execute().mapPartitions { iterator =>
    -      val columnBuilders = output.map { attribute =>
    -        ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
    -      }.toArray
    -
    -      var row: Row = null
    -      while (iterator.hasNext) {
    -        row = iterator.next()
    -        var i = 0
    -        while (i < row.length) {
    -          columnBuilders(i).appendFrom(row, i)
    -          i += 1
    +    val cached = child.execute().mapPartitions { baseIterator =>
    +      new Iterator[Array[ByteBuffer]] {
    +        def next() = {
    +          val columnBuilders = output.map { attribute =>
    +            ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
    --- End diff --
    
    A more precise initial buffer size can be used here:
    
    ```scala
    val columnType = ColumnType(attribute.dataType)
    ColumnBuilder(columnType.typeId, columnType.defaultSize * batchSize, attribute.name, useCompression)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/1880#issuecomment-51869125
  
    @liancheng, thanks for reviewing!  Would you mind creating a JIRA/followup PR to set the defaults correctly as you propose?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1880#discussion_r16052399
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---
    @@ -90,22 +101,31 @@ private[sql] case class InMemoryColumnarTableScan(
     
       override def execute() = {
         relation.cachedColumnBuffers.mapPartitions { iterator =>
    -      val columnBuffers = iterator.next()
    -      assert(!iterator.hasNext)
    +      // Find the ordinals of the requested columns.  If none are requested, use the first.
    +      val requestedColumns =
    +        if (attributes.isEmpty) {
    +          Seq(0)
    +        } else {
    +          attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
    +        }
     
           new Iterator[Row] {
    -        // Find the ordinals of the requested columns.  If none are requested, use the first.
    -        val requestedColumns =
    -          if (attributes.isEmpty) {
    -            Seq(0)
    -          } else {
    -            attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
    -          }
    +        private[this] var columnBuffers: Array[ByteBuffer] = null
    +        private[this] var columnAccessors: Seq[ColumnAccessor] = null
    +        nextBatch()
    --- End diff --
    
    Maybe I don't get it correctly, but do you mean we should try to reuse batch buffers rather than always allocate new ones for a new batch? I like the idea, and it can surely make the column buffer building process more memory efficient. But currently due to the way `ColumnBuilder` is implemented, buffer reusing needs more work to be done, probably in another PR :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2650][SQL] Build column buffers in smal...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1880


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org