You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by guowei2 <gi...@git.apache.org> on 2014/08/07 04:10:18 UTC

[GitHub] spark pull request: [SPARK-2873] using ExternalAppendOnlyMap to re...

GitHub user guowei2 opened a pull request:

    https://github.com/apache/spark/pull/1822

    [SPARK-2873] using ExternalAppendOnlyMap to resolve OOM when aggregating

    Using ExternalAppendOnlyMap to resolve OOM when aggregating.
    Using "spark.shuffle.spill" to open it or not 
    Hive udaf does not support yet for udaf need Serializable
    
    Join has  the same problem. but using ExternalAppendOnlyMap as CoGroupedRDD  seems to reduce performance. i try another way by using ExternalAppendOnlyMap. but it needs testing .i will commit it in another batch.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/guowei2/spark sql-memory-patch

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1822.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1822
    
----
commit 87627e700e3205499726aa1ab5c1ee6e56433b5e
Author: guowei <gu...@upyoo.com>
Date:   2014-08-06T04:02:38Z

    [SPARK-2873] use ExternalAppendOnlyMap to resolve aggregate's OOM

commit f889700ec522aab688c8c3be8bb4a9402776f35f
Author: guowei <gu...@upyoo.com>
Date:   2014-08-06T04:11:43Z

    [SPARK-2873] use ExternalAppendOnlyMap to resolve aggregate's OOM

commit 21b573548f742d8e6066364642bc70eece512bd5
Author: guowei <gu...@upyoo.com>
Date:   2014-08-06T07:53:18Z

    [SPARK-2873] use ExternalAppendOnlyMap to resolve aggregate's OOM

commit d2be8323535c106f336ebb8148acf54cd351cdae
Author: guowei <gu...@upyoo.com>
Date:   2014-08-06T08:50:48Z

    [SPARK-2873] use ExternalAppendOnlyMap to resolve aggregate's OOM

commit e3a88b115c608edf12806d2698f74e9289508e7d
Author: guowei <gu...@upyoo.com>
Date:   2014-08-06T09:13:36Z

    [SPARK-2873] use ExternalAppendOnlyMap to resolve aggregate's OOM

commit 2a4786a92e00c671bb422f2de38547dde7721a9c
Author: guowei <gu...@upyoo.com>
Date:   2014-08-06T09:14:14Z

    Merge branch 'sql-memory-patch' of https://github.com/guowei2/spark into sql-memory-patch

commit 475da9d3b6304892af3d41471aceb3f81b0cc490
Author: guowei <gu...@upyoo.com>
Date:   2014-08-06T09:15:39Z

    [SPARK-2873] use ExternalAppendOnlyMap to resolve aggregate's OOM

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] using ExternalAppendOnlyMap to re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1822#issuecomment-51422745
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by guowei2 <gi...@git.apache.org>.
Github user guowei2 commented on the pull request:

    https://github.com/apache/spark/pull/1822#issuecomment-52489381
  
    i'm very sorry. i just rebase my brach to spark/master. what should i do to fix this .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16223080
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala ---
    @@ -114,6 +115,14 @@ case class MinFunction(expr: Expression, base: AggregateExpression) extends Aggr
         }
       }
     
    +  override def merge(input: AggregateFunction): Unit = {
    +    if (currentMin == null) {
    +      currentMin = input.eval(EmptyRow)
    +    } else if(GreaterThan(this, input).eval(EmptyRow) == true) {
    --- End diff --
    
    Space after `if`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16196674
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -123,34 +124,73 @@ case class Aggregate(
         }
       }
     
    -  override def execute() = attachTree(this, "execute") {
    -    if (groupingExpressions.isEmpty) {
    -      child.execute().mapPartitions { iter =>
    -        val buffer = newAggregateBuffer()
    -        var currentRow: Row = null
    -        while (iter.hasNext) {
    -          currentRow = iter.next()
    -          var i = 0
    -          while (i < buffer.length) {
    -            buffer(i).update(currentRow)
    -            i += 1
    -          }
    -        }
    -        val resultProjection = new InterpretedProjection(resultExpressions, computedSchema)
    -        val aggregateResults = new GenericMutableRow(computedAggregates.length)
    -
    +  def aggregateNoGrouping() = {
    --- End diff --
    
    Probably could be `protected`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16196975
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -168,32 +208,68 @@ case class Aggregate(
                 i += 1
               }
             }
    -
    -        new Iterator[Row] {
    +        val iterPair = new Iterator[(Row, Array[AggregateFunction])] {
               private[this] val hashTableIter = hashTable.entrySet().iterator()
    -          private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length)
    -          private[this] val resultProjection =
    -            new InterpretedMutableProjection(
    -              resultExpressions, computedSchema ++ namedGroups.map(_._2))
    -          private[this] val joinedRow = new JoinedRow
    -
               override final def hasNext: Boolean = hashTableIter.hasNext
     
    -          override final def next(): Row = {
    +          override final def next(): (Row, Array[AggregateFunction]) = {
    --- End diff --
    
    I am a little concerned about the addition of an extra tuple object allocation here.  This may not be a problem, but we'll want to run some benchmarks and make sure that we are not slowing down the on-heap version here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by guowei2 <gi...@git.apache.org>.
Github user guowei2 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16235328
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala ---
    @@ -292,6 +312,11 @@ case class AverageFunction(expr: Expression, base: AggregateExpression)
           sum.update(addFunction(evaluatedExpr), input)
         }
       }
    +
    +  override def merge(input: AggregateFunction): Unit = {
    +    count += input.asInstanceOf[AverageFunction].getCount
    --- End diff --
    
    @chenghao-intel  
    aha, it's a difference between java and scala .thanks a lot 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16196737
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -123,34 +124,73 @@ case class Aggregate(
         }
       }
     
    -  override def execute() = attachTree(this, "execute") {
    -    if (groupingExpressions.isEmpty) {
    -      child.execute().mapPartitions { iter =>
    -        val buffer = newAggregateBuffer()
    -        var currentRow: Row = null
    -        while (iter.hasNext) {
    -          currentRow = iter.next()
    -          var i = 0
    -          while (i < buffer.length) {
    -            buffer(i).update(currentRow)
    -            i += 1
    -          }
    -        }
    -        val resultProjection = new InterpretedProjection(resultExpressions, computedSchema)
    -        val aggregateResults = new GenericMutableRow(computedAggregates.length)
    -
    +  def aggregateNoGrouping() = {
    +    child.execute().mapPartitions { iter =>
    +      val buffer = newAggregateBuffer()
    +      var currentRow: Row = null
    +      while (iter.hasNext) {
    +        currentRow = iter.next()
             var i = 0
             while (i < buffer.length) {
    -          aggregateResults(i) = buffer(i).eval(EmptyRow)
    +          buffer(i).update(currentRow)
               i += 1
             }
    +      }
    +      val resultProjection = new InterpretedProjection(resultExpressions, computedSchema)
    +      val aggregateResults = new GenericMutableRow(computedAggregates.length)
     
    -        Iterator(resultProjection(aggregateResults))
    +      var i = 0
    +      while (i < buffer.length) {
    +        aggregateResults(i) = buffer(i).eval(EmptyRow)
    +        i += 1
           }
    +
    +      Iterator(resultProjection(aggregateResults))
    +    }
    +  }
    +
    +  def resultRow(iter: Iterator[(Row,Array[AggregateFunction])]) = {
    --- End diff --
    
    Can you add some scala doc to explain whats going on here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by guowei2 <gi...@git.apache.org>.
Github user guowei2 commented on the pull request:

    https://github.com/apache/spark/pull/1822#issuecomment-52032345
  
    I've improve the structure and testing. is it better now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16197056
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -134,15 +134,26 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
                  groupingExpressions,
                  partialComputation,
                  child) =>
    -        execution.Aggregate(
    -          partial = false,
    -          namedGroupingAttributes,
    -          rewrittenAggregateExpressions,
    -          execution.Aggregate(
    -            partial = true,
    -            groupingExpressions,
    -            partialComputation,
    -            planLater(child))) :: Nil
    +
    +        val preAggregate = execution.OnHeapAggregate(
    --- End diff --
    
    Why is the preAggreate always on heap?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/1822#issuecomment-51637601
  
    Thanks for working on this!  This will be a great addition for 1.2 :)  Minor feedback on structure and testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16196620
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala ---
    @@ -340,4 +340,10 @@ private[hive] case class HiveUdafFunction(
         val inputs = inputProjection(input).asInstanceOf[Seq[AnyRef]].toArray
         function.iterate(buffer, inputs)
       }
    +
    +  //hiveUdaf does not support external aggregate, for HiveUdafFunction need to spill to disk,
    +  //and all the vals above need Serializable
    +  override def merge(input: AggregateFunction): Unit = {
    +    throw new NotImplementedError(s"HiveUdaf does not support external aggregate")
    --- End diff --
    
    I think we are going to have to figure out something that works here.  Perhaps we can use `SerializableWritable` or something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16007364
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala ---
    @@ -340,4 +340,9 @@ private[hive] case class HiveUdafFunction(
         val inputs = inputProjection(input).asInstanceOf[Seq[AnyRef]].toArray
         function.iterate(buffer, inputs)
       }
    +
    +  override def merge(input: AggregateFunction): Unit = {
    --- End diff --
    
    Does this mean that we cant do external aggregation if there are hive UDAFS.  We should be throwing an exception here.  It is very bad to silently return wrong results.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by guowei2 <gi...@git.apache.org>.
Github user guowei2 commented on the pull request:

    https://github.com/apache/spark/pull/1822#issuecomment-52490972
  
    may i close this PR and create a new PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16007279
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -148,53 +151,121 @@ case class Aggregate(
             Iterator(resultProjection(aggregateResults))
           }
         } else {
    -      child.execute().mapPartitions { iter =>
    -        val hashTable = new HashMap[Row, Array[AggregateFunction]]
    -        val groupingProjection = new InterpretedMutableProjection(groupingExpressions, childOutput)
    +      if (!externalSorting) {
    --- End diff --
    
    Instead of doing the if check here, what do you think about this:
     - Create a trait called `Aggregate` that contains shared code.
     - Create two operators `OnHeapAggregation` and `ExternalAggregation`.
     - Choose which one to use in the planner based on the above configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/1822#issuecomment-52095270
  
    This is looking better! Thanks again for working on it :)
    
    A few more comments. Also, it would be really good if we could do some (at least micro) benchmarks before an after for on-heap and then a comparison with the external version.  Let me know if you need some help coming up with that.
    
    Also, one note.  Since this is a pretty major change, I'll want to wait until after the 1.1 release to merge it in.  That said, really excited about it for 1.2.  You may want to rebase / merge to master to avoid getting to far behind though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16223044
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala ---
    @@ -375,6 +420,10 @@ case class SumDistinctFunction(expr: Expression, base: AggregateExpression)
         }
       }
     
    +  override def merge(input: AggregateFunction): Unit = {
    +    input.asInstanceOf[SumDistinctFunction].getSeen.map(seen += _)
    --- End diff --
    
    use `seen ++= input.asInstanceOf[SumDistinctFunction].seen` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16196431
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -58,7 +59,7 @@ case class Aggregate(
     
       // HACK: Generators don't correctly preserve their output through serializations so we grab
       // out child's output attributes statically here.
    -  private[this] val childOutput = child.output
    +  val childOutput = child.output
    --- End diff --
    
    `protected` instead of making this public.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16007142
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AggregatesSuite.scala ---
    @@ -0,0 +1,70 @@
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import org.scalatest.FunSuite
    +import org.apache.spark.sql.catalyst.types._
    +import org.apache.spark.sql.catalyst.dsl.expressions._
    +
    +
    +class AggregatesSuite extends FunSuite {
    --- End diff --
    
    Add some scala doc about what sorts of things this suite is testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by guowei2 <gi...@git.apache.org>.
Github user guowei2 closed the pull request at:

    https://github.com/apache/spark/pull/1822


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16223599
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala ---
    @@ -292,6 +312,11 @@ case class AverageFunction(expr: Expression, base: AggregateExpression)
           sum.update(addFunction(evaluatedExpr), input)
         }
       }
    +
    +  override def merge(input: AggregateFunction): Unit = {
    +    count += input.asInstanceOf[AverageFunction].getCount
    --- End diff --
    
    You can access the private member directly here, as they are the same type. So the `getXXX` methods is not necessary.
    e.g.
    ```
    scala> class A(private var a: Int) {
         |   def add(v: A): A = {
         |     a = a + v.a
         |     this
         |   }
         |   override def toString = {a.toString}
         | }
    defined class A
    
    scala> val a = new A(11)
    a: A = 11
    
    scala> val b = new A(12)
    b: A = 12
    
    scala> val c = a.add(b)
    c: A = 23
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16196160
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
    @@ -316,12 +316,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
         val strategies: Seq[Strategy] =
           CommandStrategy(self) ::
           TakeOrdered ::
    -      HashAggregation ::
    +      HashAggregation(self) ::
    --- End diff --
    
    Can't the strategies already access `self` though `sqlContext`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16196360
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -20,30 +20,31 @@ package org.apache.spark.sql.execution
     import java.util.HashMap
     
     import org.apache.spark.annotation.DeveloperApi
    -import org.apache.spark.SparkContext
    +import org.apache.spark.{SparkEnv, SparkContext}
     import org.apache.spark.sql.catalyst.errors._
     import org.apache.spark.sql.catalyst.expressions._
     import org.apache.spark.sql.catalyst.plans.physical._
     import org.apache.spark.sql.SQLContext
    +import org.apache.spark.util.collection.ExternalAppendOnlyMap
     
     /**
    - * :: DeveloperApi ::
      * Groups input data by `groupingExpressions` and computes the `aggregateExpressions` for each
      * group.
      *
    - * @param partial if true then aggregation is done partially on local data without shuffling to
    + * partial if true then aggregation is done partially on local data without shuffling to
    --- End diff --
    
    How about we add something like `Concrete subclasses of this trait must implement the following abstract members:`  and then use [wiki syntax](https://wiki.scala-lang.org/display/SW/Syntax) to make the items into a list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by guowei2 <gi...@git.apache.org>.
Github user guowei2 commented on the pull request:

    https://github.com/apache/spark/pull/1822#issuecomment-51764097
  
    Thank you for your suggestion, It truely encourage me . I'll do my best to fix it up 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by guowei2 <gi...@git.apache.org>.
Github user guowei2 commented on the pull request:

    https://github.com/apache/spark/pull/1822#issuecomment-52581172
  
    i have to close this PR and close a new one .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16007110
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AggregatesSuite.scala ---
    @@ -0,0 +1,70 @@
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import org.scalatest.FunSuite
    +import org.apache.spark.sql.catalyst.types._
    +import org.apache.spark.sql.catalyst.dsl.expressions._
    +
    +
    +class AggregatesSuite extends FunSuite {
    +
    +  val testRows = Seq(1,1,2,2,3,3,4,4).map(x => {
    +    val row = new GenericMutableRow(1)
    +    row(0) = x
    +    row
    +  })
    +
    +  val dataType: DataType = IntegerType
    +
    +  val exp = BoundReference(0,dataType,true)
    --- End diff --
    
    Space after `,`s


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] using ExternalAppendOnlyMap to re...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/1822#issuecomment-51435812
  
    @guowei2 can you add `[SQL]` to the title here so it gets sorted correctly? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16222998
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala ---
    @@ -368,6 +411,8 @@ case class SumDistinctFunction(expr: Expression, base: AggregateExpression)
     
       private val seen = new scala.collection.mutable.HashSet[Any]()
     
    +  def getSeen: scala.collection.mutable.HashSet[Any] = seen
    --- End diff --
    
    This is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by guowei2 <gi...@git.apache.org>.
Github user guowei2 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16235116
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -134,15 +134,26 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
                  groupingExpressions,
                  partialComputation,
                  child) =>
    -        execution.Aggregate(
    -          partial = false,
    -          namedGroupingAttributes,
    -          rewrittenAggregateExpressions,
    -          execution.Aggregate(
    -            partial = true,
    -            groupingExpressions,
    -            partialComputation,
    -            planLater(child))) :: Nil
    +
    +        val preAggregate = execution.OnHeapAggregate(
    --- End diff --
    
    i think  map side aggregate does not need external, for data in one partition (1 task) isn't large enough to the JVM heap size .also user can easily control the partition data size.
    In most cases, OOM occur on reduce side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16007186
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -45,6 +46,8 @@ case class Aggregate(
         child: SparkPlan)
       extends UnaryNode {
     
    +  private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", false)
    --- End diff --
    
    We should be using SQLConf and create a special setting for external aggregation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on the pull request:

    https://github.com/apache/spark/pull/1822#issuecomment-52491562
  
    Try this: `git commit -m "Big-ass commit" --allow-empty`  `git rebase -i  master`, `git push origin sql-memory-patch -f `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16196452
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -90,7 +91,7 @@ case class Aggregate(
       private[this] val computedSchema = computedAggregates.map(_.resultAttribute)
     
       /** Creates a new aggregate buffer for a group. */
    -  private[this] def newAggregateBuffer(): Array[AggregateFunction] = {
    +  def newAggregateBuffer(): Array[AggregateFunction] = {
    --- End diff --
    
    `protected`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1822#discussion_r16007101
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AggregatesSuite.scala ---
    @@ -0,0 +1,70 @@
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import org.scalatest.FunSuite
    +import org.apache.spark.sql.catalyst.types._
    +import org.apache.spark.sql.catalyst.dsl.expressions._
    +
    +
    +class AggregatesSuite extends FunSuite {
    +
    +  val testRows = Seq(1,1,2,2,3,3,4,4).map(x => {
    +    val row = new GenericMutableRow(1)
    +    row(0) = x
    +    row
    +  })
    +
    +  val dataType: DataType = IntegerType
    +
    +  val exp = BoundReference(0,dataType,true)
    +
    +  def checkMethod(f:AggregateExpression) = {
    --- End diff --
    
    Space after `:`.  Can you add some scala doc about what this is checking?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org