You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by lamuguo <gi...@git.apache.org> on 2014/05/24 01:35:56 UTC

[GitHub] spark pull request: Use Aggregator for Spark SQL

GitHub user lamuguo opened a pull request:

    https://github.com/apache/spark/pull/867

    Use Aggregator for Spark SQL

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/lamuguo/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/867.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #867
    
----
commit 6507d1596de1fc9e1f73aea76bc6e596ef59e578
Author: Lamu Guo <la...@gmail.com>
Date:   2014-05-02T06:04:50Z

    Applying Aggregator in Aggregate execution
    
    First try.
    
    Some test errors, but matches with another baseline result. Will double
    check whether all test cases can be passed currently later.
    
    Error info below:
    [info] Passed: Total 197, Failed 0, Errors 0, Passed 195, Skipped 2
    [error] (repl/test:test) sbt.TestsFailedException: Tests unsuccessful
    [error] (streaming/test:test) sbt.TestsFailedException: Tests
    unsuccessful
    [error] (core/test:test) sbt.TestsFailedException: Tests unsuccessful
    [error] Total time: 1755 s, completed May 1, 2014 10:49:27 PM

commit 358874ef5b01acf4d60bbce68756f94223881726
Author: Lamu Guo <la...@gmail.com>
Date:   2014-05-02T16:35:11Z

    Support SparkSqlSerializer for Aggregate
    
    And fixed some style problems.

commit f941a50d73304b5716f3e9da8e89436213dba045
Author: Lamu Guo <la...@gmail.com>
Date:   2014-05-02T16:54:53Z

    Added spark conf for SparkSqlSerializer

commit e5bc329b1a1be3c563a9024a94e7f641855903ab
Author: Lamu Guo <la...@gmail.com>
Date:   2014-05-23T02:07:27Z

    Support AggregateFunction.merge()
    
    To eliminate saving of rows in interim data.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Use Aggregator for Spark SQL

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/867#issuecomment-44070545
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1627: Support external aggregation by us...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/867


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1627: Support external aggregation by us...

Posted by lamuguo <gi...@git.apache.org>.
Github user lamuguo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/867#discussion_r13217800
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -155,48 +155,60 @@ case class Aggregate(
           }
         } else {
           child.execute().mapPartitions { iter =>
    -        val hashTable = new HashMap[Row, Array[AggregateFunction]]
    -        val groupingProjection = new MutableProjection(groupingExpressions, childOutput)
    -
    -        var currentRow: Row = null
    -        while (iter.hasNext) {
    -          currentRow = iter.next()
    -          val currentGroup = groupingProjection(currentRow)
    -          var currentBuffer = hashTable.get(currentGroup)
    -          if (currentBuffer == null) {
    -            currentBuffer = newAggregateBuffer()
    -            hashTable.put(currentGroup.copy(), currentBuffer)
    +        val groupingProjection = new
    +            MutableProjection(groupingExpressions, childOutput)
    +        // TODO: Can't use "Array[AggregateFunction]" directly, due to lack of
    +        // "concat(AggregateFunction, AggregateFunction)". Should add
    +        // AggregateFunction.update(agg: AggregateFunction) in the future.
    +        def createCombiner(row: Row) = mergeValue(newAggregateBuffer(), row)
    +        def mergeValue(buffer: Array[AggregateFunction], row: Row) = {
    +          for (i <- 0 to buffer.length - 1) {
    +            buffer(i).update(row)
               }
    -
    -          var i = 0
    -          while (i < currentBuffer.length) {
    -            currentBuffer(i).update(currentRow)
    -            i += 1
    +          buffer
    +        }
    +        def mergeCombiners(buf1: Array[AggregateFunction], buf2: Array[AggregateFunction]) = {
    +          if (buf1.length != buf2.length) {
    +            throw new TreeNodeException(this, s"Unequal aggregate buffer length ${buf1.length} != ${buf2.length}")
    +          }
    +          for (i <- 0 to buf1.length - 1) {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1627: Support external aggregation by us...

Posted by lamuguo <gi...@git.apache.org>.
Github user lamuguo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/867#discussion_r13217796
  
    --- Diff: core/src/main/scala/org/apache/spark/Aggregator.scala ---
    @@ -32,7 +33,8 @@ import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
     case class Aggregator[K, V, C] (
         createCombiner: V => C,
         mergeValue: (C, V) => C,
    -    mergeCombiners: (C, C) => C) {
    +    mergeCombiners: (C, C) => C,
    +    serializer: Serializer = SparkEnv.get.serializer) {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Use Aggregator for Spark SQL

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/867#discussion_r13029100
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -155,48 +155,60 @@ case class Aggregate(
           }
         } else {
           child.execute().mapPartitions { iter =>
    -        val hashTable = new HashMap[Row, Array[AggregateFunction]]
    -        val groupingProjection = new MutableProjection(groupingExpressions, childOutput)
    -
    -        var currentRow: Row = null
    -        while (iter.hasNext) {
    -          currentRow = iter.next()
    -          val currentGroup = groupingProjection(currentRow)
    -          var currentBuffer = hashTable.get(currentGroup)
    -          if (currentBuffer == null) {
    -            currentBuffer = newAggregateBuffer()
    -            hashTable.put(currentGroup.copy(), currentBuffer)
    +        val groupingProjection = new
    +            MutableProjection(groupingExpressions, childOutput)
    +        // TODO: Can't use "Array[AggregateFunction]" directly, due to lack of
    +        // "concat(AggregateFunction, AggregateFunction)". Should add
    +        // AggregateFunction.update(agg: AggregateFunction) in the future.
    +        def createCombiner(row: Row) = mergeValue(newAggregateBuffer(), row)
    +        def mergeValue(buffer: Array[AggregateFunction], row: Row) = {
    +          for (i <- 0 to buffer.length - 1) {
    --- End diff --
    
    It'd be better to rewrite this using a while loop, since while loops perform much better than for loop in Scala. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Use Aggregator for Spark SQL

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/867#issuecomment-44110777
  
    @lamuguo the issue is https://issues.apache.org/jira/browse/SPARK-1627 
    
    Please update the title of the pull request to:
    
    SPARK-1627: Support external aggregation by using Aggregator in Spark SQL


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1627: Support external aggregation by us...

Posted by lamuguo <gi...@git.apache.org>.
Github user lamuguo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/867#discussion_r13217797
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -155,48 +155,60 @@ case class Aggregate(
           }
         } else {
           child.execute().mapPartitions { iter =>
    -        val hashTable = new HashMap[Row, Array[AggregateFunction]]
    -        val groupingProjection = new MutableProjection(groupingExpressions, childOutput)
    -
    -        var currentRow: Row = null
    -        while (iter.hasNext) {
    -          currentRow = iter.next()
    -          val currentGroup = groupingProjection(currentRow)
    -          var currentBuffer = hashTable.get(currentGroup)
    -          if (currentBuffer == null) {
    -            currentBuffer = newAggregateBuffer()
    -            hashTable.put(currentGroup.copy(), currentBuffer)
    +        val groupingProjection = new
    +            MutableProjection(groupingExpressions, childOutput)
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Use Aggregator for Spark SQL

Posted by lamuguo <gi...@git.apache.org>.
Github user lamuguo commented on the pull request:

    https://github.com/apache/spark/pull/867#issuecomment-44070972
  
    Possible to forward to Reynold? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Use Aggregator for Spark SQL

Posted by hsaputra <gi...@git.apache.org>.
Github user hsaputra commented on the pull request:

    https://github.com/apache/spark/pull/867#issuecomment-44079360
  
    Hi,
    
    Please add more description on what is the desired affect.
    It would be also appreciated if you file ASF JIRA ticket [1] in addition to help trace the changes.
    
    [1] https://issues.apache.org/jira/secure/Dashboard.jspa


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1627: Support external aggregation by us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/867#issuecomment-54694696
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1627: Support external aggregation by us...

Posted by lamuguo <gi...@git.apache.org>.
Github user lamuguo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/867#discussion_r13217803
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -155,48 +155,60 @@ case class Aggregate(
           }
         } else {
           child.execute().mapPartitions { iter =>
    -        val hashTable = new HashMap[Row, Array[AggregateFunction]]
    -        val groupingProjection = new MutableProjection(groupingExpressions, childOutput)
    -
    -        var currentRow: Row = null
    -        while (iter.hasNext) {
    -          currentRow = iter.next()
    -          val currentGroup = groupingProjection(currentRow)
    -          var currentBuffer = hashTable.get(currentGroup)
    -          if (currentBuffer == null) {
    -            currentBuffer = newAggregateBuffer()
    -            hashTable.put(currentGroup.copy(), currentBuffer)
    +        val groupingProjection = new
    +            MutableProjection(groupingExpressions, childOutput)
    +        // TODO: Can't use "Array[AggregateFunction]" directly, due to lack of
    +        // "concat(AggregateFunction, AggregateFunction)". Should add
    +        // AggregateFunction.update(agg: AggregateFunction) in the future.
    +        def createCombiner(row: Row) = mergeValue(newAggregateBuffer(), row)
    +        def mergeValue(buffer: Array[AggregateFunction], row: Row) = {
    +          for (i <- 0 to buffer.length - 1) {
    +            buffer(i).update(row)
               }
    -
    -          var i = 0
    -          while (i < currentBuffer.length) {
    -            currentBuffer(i).update(currentRow)
    -            i += 1
    +          buffer
    +        }
    +        def mergeCombiners(buf1: Array[AggregateFunction], buf2: Array[AggregateFunction]) = {
    +          if (buf1.length != buf2.length) {
    +            throw new TreeNodeException(this, s"Unequal aggregate buffer length ${buf1.length} != ${buf2.length}")
    +          }
    +          for (i <- 0 to buf1.length - 1) {
    +            buf1(i).merge(buf2(i))
               }
    +          buf1
             }
    -
    +        val aggregator = new Aggregator[Row, Row, Array[AggregateFunction]](
    +          createCombiner, mergeValue, mergeCombiners, new SparkSqlSerializer(new SparkConf(false)))
    +
    +        val aggIter = aggregator.combineValuesByKey(
    +          new Iterator[(Row, Row)] {  // (groupKey, row)
    +            override final def hasNext: Boolean = iter.hasNext
    +
    +            override final def next(): (Row, Row) = {
    +              val row = iter.next()
    +              // TODO: copy() here for suppressing reference problems. Please clearly address
    +              // the root-cause and remove copy() here.
    +              (groupingProjection(row).copy(), row)
    +            }
    +          },
    +          null
    +        )
             new Iterator[Row] {
    -          private[this] val hashTableIter = hashTable.entrySet().iterator()
               private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length)
    -          private[this] val resultProjection =
    -            new MutableProjection(resultExpressions, computedSchema ++ namedGroups.map(_._2))
    +          private[this] val resultProjection = new MutableProjection(
    +            resultExpressions, computedSchema ++ namedGroups.map(_._2))
               private[this] val joinedRow = new JoinedRow
     
    -          override final def hasNext: Boolean = hashTableIter.hasNext
    +          override final def hasNext: Boolean = aggIter.hasNext
     
               override final def next(): Row = {
    -            val currentEntry = hashTableIter.next()
    -            val currentGroup = currentEntry.getKey
    -            val currentBuffer = currentEntry.getValue
    -
    -            var i = 0
    -            while (i < currentBuffer.length) {
    -              // Evaluating an aggregate buffer returns the result.  No row is required since we
    -              // already added all rows in the group using update.
    -              aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
    -              i += 1
    +            val entry = aggIter.next()
    +            val group = entry._1
    +            val data = entry._2
    +
    +            for (i <- 0 to data.length - 1) {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Use Aggregator for Spark SQL

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/867#discussion_r13029115
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -155,48 +155,60 @@ case class Aggregate(
           }
         } else {
           child.execute().mapPartitions { iter =>
    -        val hashTable = new HashMap[Row, Array[AggregateFunction]]
    -        val groupingProjection = new MutableProjection(groupingExpressions, childOutput)
    -
    -        var currentRow: Row = null
    -        while (iter.hasNext) {
    -          currentRow = iter.next()
    -          val currentGroup = groupingProjection(currentRow)
    -          var currentBuffer = hashTable.get(currentGroup)
    -          if (currentBuffer == null) {
    -            currentBuffer = newAggregateBuffer()
    -            hashTable.put(currentGroup.copy(), currentBuffer)
    +        val groupingProjection = new
    +            MutableProjection(groupingExpressions, childOutput)
    +        // TODO: Can't use "Array[AggregateFunction]" directly, due to lack of
    +        // "concat(AggregateFunction, AggregateFunction)". Should add
    +        // AggregateFunction.update(agg: AggregateFunction) in the future.
    +        def createCombiner(row: Row) = mergeValue(newAggregateBuffer(), row)
    +        def mergeValue(buffer: Array[AggregateFunction], row: Row) = {
    +          for (i <- 0 to buffer.length - 1) {
    +            buffer(i).update(row)
               }
    -
    -          var i = 0
    -          while (i < currentBuffer.length) {
    -            currentBuffer(i).update(currentRow)
    -            i += 1
    +          buffer
    +        }
    +        def mergeCombiners(buf1: Array[AggregateFunction], buf2: Array[AggregateFunction]) = {
    +          if (buf1.length != buf2.length) {
    +            throw new TreeNodeException(this, s"Unequal aggregate buffer length ${buf1.length} != ${buf2.length}")
    +          }
    +          for (i <- 0 to buf1.length - 1) {
    +            buf1(i).merge(buf2(i))
               }
    +          buf1
             }
    -
    +        val aggregator = new Aggregator[Row, Row, Array[AggregateFunction]](
    +          createCombiner, mergeValue, mergeCombiners, new SparkSqlSerializer(new SparkConf(false)))
    +
    +        val aggIter = aggregator.combineValuesByKey(
    +          new Iterator[(Row, Row)] {  // (groupKey, row)
    +            override final def hasNext: Boolean = iter.hasNext
    +
    +            override final def next(): (Row, Row) = {
    +              val row = iter.next()
    +              // TODO: copy() here for suppressing reference problems. Please clearly address
    +              // the root-cause and remove copy() here.
    +              (groupingProjection(row).copy(), row)
    +            }
    +          },
    +          null
    +        )
             new Iterator[Row] {
    -          private[this] val hashTableIter = hashTable.entrySet().iterator()
               private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length)
    -          private[this] val resultProjection =
    -            new MutableProjection(resultExpressions, computedSchema ++ namedGroups.map(_._2))
    +          private[this] val resultProjection = new MutableProjection(
    +            resultExpressions, computedSchema ++ namedGroups.map(_._2))
               private[this] val joinedRow = new JoinedRow
     
    -          override final def hasNext: Boolean = hashTableIter.hasNext
    +          override final def hasNext: Boolean = aggIter.hasNext
     
               override final def next(): Row = {
    -            val currentEntry = hashTableIter.next()
    -            val currentGroup = currentEntry.getKey
    -            val currentBuffer = currentEntry.getValue
    -
    -            var i = 0
    -            while (i < currentBuffer.length) {
    -              // Evaluating an aggregate buffer returns the result.  No row is required since we
    -              // already added all rows in the group using update.
    -              aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
    -              i += 1
    +            val entry = aggIter.next()
    +            val group = entry._1
    +            val data = entry._2
    +
    +            for (i <- 0 to data.length - 1) {
    --- End diff --
    
    again, while loop here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Use Aggregator for Spark SQL

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/867#discussion_r13029106
  
    --- Diff: core/src/main/scala/org/apache/spark/Aggregator.scala ---
    @@ -32,7 +33,8 @@ import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
     case class Aggregator[K, V, C] (
         createCombiner: V => C,
         mergeValue: (C, V) => C,
    -    mergeCombiners: (C, C) => C) {
    +    mergeCombiners: (C, C) => C,
    +    serializer: Serializer = SparkEnv.get.serializer) {
    --- End diff --
    
    also update the documentation above to add the new parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Use Aggregator for Spark SQL

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/867#discussion_r13029101
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -155,48 +155,60 @@ case class Aggregate(
           }
         } else {
           child.execute().mapPartitions { iter =>
    -        val hashTable = new HashMap[Row, Array[AggregateFunction]]
    -        val groupingProjection = new MutableProjection(groupingExpressions, childOutput)
    -
    -        var currentRow: Row = null
    -        while (iter.hasNext) {
    -          currentRow = iter.next()
    -          val currentGroup = groupingProjection(currentRow)
    -          var currentBuffer = hashTable.get(currentGroup)
    -          if (currentBuffer == null) {
    -            currentBuffer = newAggregateBuffer()
    -            hashTable.put(currentGroup.copy(), currentBuffer)
    +        val groupingProjection = new
    +            MutableProjection(groupingExpressions, childOutput)
    --- End diff --
    
    no need to wrap this line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1627: Support external aggregation by us...

Posted by lamuguo <gi...@git.apache.org>.
Github user lamuguo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/867#discussion_r13477507
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -155,48 +155,60 @@ case class Aggregate(
           }
         } else {
           child.execute().mapPartitions { iter =>
    -        val hashTable = new HashMap[Row, Array[AggregateFunction]]
    -        val groupingProjection = new MutableProjection(groupingExpressions, childOutput)
    -
    -        var currentRow: Row = null
    -        while (iter.hasNext) {
    -          currentRow = iter.next()
    -          val currentGroup = groupingProjection(currentRow)
    -          var currentBuffer = hashTable.get(currentGroup)
    -          if (currentBuffer == null) {
    -            currentBuffer = newAggregateBuffer()
    -            hashTable.put(currentGroup.copy(), currentBuffer)
    +        val groupingProjection = new
    +            MutableProjection(groupingExpressions, childOutput)
    +        // TODO: Can't use "Array[AggregateFunction]" directly, due to lack of
    +        // "concat(AggregateFunction, AggregateFunction)". Should add
    +        // AggregateFunction.update(agg: AggregateFunction) in the future.
    +        def createCombiner(row: Row) = mergeValue(newAggregateBuffer(), row)
    +        def mergeValue(buffer: Array[AggregateFunction], row: Row) = {
    +          for (i <- 0 to buffer.length - 1) {
    +            buffer(i).update(row)
               }
    -
    -          var i = 0
    -          while (i < currentBuffer.length) {
    -            currentBuffer(i).update(currentRow)
    -            i += 1
    +          buffer
    +        }
    +        def mergeCombiners(buf1: Array[AggregateFunction], buf2: Array[AggregateFunction]) = {
    +          if (buf1.length != buf2.length) {
    +            throw new TreeNodeException(this, s"Unequal aggregate buffer length ${buf1.length} != ${buf2.length}")
    +          }
    +          for (i <- 0 to buf1.length - 1) {
    +            buf1(i).merge(buf2(i))
               }
    +          buf1
             }
    -
    +        val aggregator = new Aggregator[Row, Row, Array[AggregateFunction]](
    +          createCombiner, mergeValue, mergeCombiners, new SparkSqlSerializer(new SparkConf(false)))
    +
    +        val aggIter = aggregator.combineValuesByKey(
    +          new Iterator[(Row, Row)] {  // (groupKey, row)
    +            override final def hasNext: Boolean = iter.hasNext
    +
    +            override final def next(): (Row, Row) = {
    +              val row = iter.next()
    +              // TODO: copy() here for suppressing reference problems. Please clearly address
    +              // the root-cause and remove copy() here.
    +              (groupingProjection(row).copy(), row)
    +            }
    +          },
    +          null
    +        )
             new Iterator[Row] {
    -          private[this] val hashTableIter = hashTable.entrySet().iterator()
               private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length)
    -          private[this] val resultProjection =
    -            new MutableProjection(resultExpressions, computedSchema ++ namedGroups.map(_._2))
    +          private[this] val resultProjection = new MutableProjection(
    +            resultExpressions, computedSchema ++ namedGroups.map(_._2))
               private[this] val joinedRow = new JoinedRow
     
    -          override final def hasNext: Boolean = hashTableIter.hasNext
    +          override final def hasNext: Boolean = aggIter.hasNext
     
               override final def next(): Row = {
    -            val currentEntry = hashTableIter.next()
    -            val currentGroup = currentEntry.getKey
    -            val currentBuffer = currentEntry.getValue
    -
    -            var i = 0
    -            while (i < currentBuffer.length) {
    -              // Evaluating an aggregate buffer returns the result.  No row is required since we
    -              // already added all rows in the group using update.
    -              aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
    -              i += 1
    +            val entry = aggIter.next()
    +            val group = entry._1
    +            val data = entry._2
    +
    +            for (i <- 0 to data.length - 1) {
    --- End diff --
    
    Hi there,
    
    I made some changes per comments couple of days ago here:
    https://github.com/apache/spark/pull/867#discussion_r13217803. Please take
    another look. Thanks!
    
    Best Regards,
    Xiaofeng
    
    
    
    On Sat, May 24, 2014 at 8:52 PM, Reynold Xin <no...@github.com>
    wrote:
    
    > In sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala:
    >
    > >
    > >            override final def next(): Row = {
    > > -            val currentEntry = hashTableIter.next()
    > > -            val currentGroup = currentEntry.getKey
    > > -            val currentBuffer = currentEntry.getValue
    > > -
    > > -            var i = 0
    > > -            while (i < currentBuffer.length) {
    > > -              // Evaluating an aggregate buffer returns the result.  No row is required since we
    > > -              // already added all rows in the group using update.
    > > -              aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
    > > -              i += 1
    > > +            val entry = aggIter.next()
    > > +            val group = entry._1
    > > +            val data = entry._2
    > > +
    > > +            for (i <- 0 to data.length - 1) {
    >
    > again, while loop here.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/867/files#r13029115>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1627: Support external aggregation by us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/867#issuecomment-46646347
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1627: Support external aggregation by us...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/867#issuecomment-47871283
  
    This is very helpful when the partition can not fit in memory. However, I think we'd better keep previous implementation; Aggregator in spark is for very typical scenario, but we do have many algorithms / ways to optimize the aggregation in SQL for performance, hence tightly coupled with Spark Aggregator may not a good idea for further improvement.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1627: Support external aggregation by us...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/867#issuecomment-54252552
  
    I think this has been subsumed by #1822, so we should close this issue for now.  If there is anything missing from the implementation there please let us know!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1627: Support external aggregation by us...

Posted by lamuguo <gi...@git.apache.org>.
Github user lamuguo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/867#discussion_r13217802
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -155,48 +155,60 @@ case class Aggregate(
           }
         } else {
           child.execute().mapPartitions { iter =>
    -        val hashTable = new HashMap[Row, Array[AggregateFunction]]
    -        val groupingProjection = new MutableProjection(groupingExpressions, childOutput)
    -
    -        var currentRow: Row = null
    -        while (iter.hasNext) {
    -          currentRow = iter.next()
    -          val currentGroup = groupingProjection(currentRow)
    -          var currentBuffer = hashTable.get(currentGroup)
    -          if (currentBuffer == null) {
    -            currentBuffer = newAggregateBuffer()
    -            hashTable.put(currentGroup.copy(), currentBuffer)
    +        val groupingProjection = new
    +            MutableProjection(groupingExpressions, childOutput)
    +        // TODO: Can't use "Array[AggregateFunction]" directly, due to lack of
    +        // "concat(AggregateFunction, AggregateFunction)". Should add
    +        // AggregateFunction.update(agg: AggregateFunction) in the future.
    +        def createCombiner(row: Row) = mergeValue(newAggregateBuffer(), row)
    +        def mergeValue(buffer: Array[AggregateFunction], row: Row) = {
    +          for (i <- 0 to buffer.length - 1) {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Use Aggregator for Spark SQL

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/867#discussion_r13029103
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
    @@ -155,48 +155,60 @@ case class Aggregate(
           }
         } else {
           child.execute().mapPartitions { iter =>
    -        val hashTable = new HashMap[Row, Array[AggregateFunction]]
    -        val groupingProjection = new MutableProjection(groupingExpressions, childOutput)
    -
    -        var currentRow: Row = null
    -        while (iter.hasNext) {
    -          currentRow = iter.next()
    -          val currentGroup = groupingProjection(currentRow)
    -          var currentBuffer = hashTable.get(currentGroup)
    -          if (currentBuffer == null) {
    -            currentBuffer = newAggregateBuffer()
    -            hashTable.put(currentGroup.copy(), currentBuffer)
    +        val groupingProjection = new
    +            MutableProjection(groupingExpressions, childOutput)
    +        // TODO: Can't use "Array[AggregateFunction]" directly, due to lack of
    +        // "concat(AggregateFunction, AggregateFunction)". Should add
    +        // AggregateFunction.update(agg: AggregateFunction) in the future.
    +        def createCombiner(row: Row) = mergeValue(newAggregateBuffer(), row)
    +        def mergeValue(buffer: Array[AggregateFunction], row: Row) = {
    +          for (i <- 0 to buffer.length - 1) {
    +            buffer(i).update(row)
               }
    -
    -          var i = 0
    -          while (i < currentBuffer.length) {
    -            currentBuffer(i).update(currentRow)
    -            i += 1
    +          buffer
    +        }
    +        def mergeCombiners(buf1: Array[AggregateFunction], buf2: Array[AggregateFunction]) = {
    +          if (buf1.length != buf2.length) {
    +            throw new TreeNodeException(this, s"Unequal aggregate buffer length ${buf1.length} != ${buf2.length}")
    +          }
    +          for (i <- 0 to buf1.length - 1) {
    --- End diff --
    
    while loop here too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---