You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by nburoojy <gi...@git.apache.org> on 2015/09/03 23:41:23 UTC

[GitHub] spark pull request: [SPARK-9301] [sql] Add collect_set aggregate f...

GitHub user nburoojy opened a pull request:

    https://github.com/apache/spark/pull/8592

    [SPARK-9301] [sql] Add collect_set aggregate function

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nburoojy/spark nick/collectset

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/8592.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #8592
    
----
commit b9cd3472bb81ccf5b9b772103342771891a60f49
Author: Nick Buroojy <ni...@civitaslearning.com>
Date:   2015-09-03T21:40:10Z

    [SPARK-9301] [sql] Add collect_set aggregate function

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8592#discussion_r44086403
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala ---
    @@ -298,6 +300,8 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
               throw new AnalysisException(s"invalid function approximate($s) $udfName")
             }
           }
    +    | COLLECT_SET ~> "(" ~> expression <~ ")" ^^ { case exp => CollectSet(exp) }
    +    | COLLECT_LIST ~> "(" ~> expression <~ ")" ^^ { case exp => CollectList(exp) }
    --- End diff --
    
    We don't need to bake things into the parser anymore.  Instead register them in [FunctionRegistery](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L117)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by nburoojy <gi...@git.apache.org>.
Github user nburoojy commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-140229974
  
    @yhuai and @rxin I have updated the collect_list and collect_set fn's to use the new aggregate interface. Please take another look. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by nburoojy <gi...@git.apache.org>.
Github user nburoojy commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-140900825
  
    @rxin What are the next steps with this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [sql] Add collect_set aggregate f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-137582240
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by nburoojy <gi...@git.apache.org>.
Github user nburoojy commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-140436077
  
    @rxin the OOM concern is valid, but I see it as a distinct from the performance concerns I have.
    
    In theory, a collect_list over an ungrouped dataset with 10M nonnull rows will return a single row containing an array of 10M elements. In practice, the array of 10M elements may require more memory than heap available on a single node and the process will OOM. This is a fundamental limitation of the collect_list and collect_set interface, I think; just like the RDD functions `collect` and `CollectByKey`, we can't return bulk data through a single node. Based on this, I think the right solution (as you mentioned) is to write the caveat that the resulting arrays should be limited in size. Where should I document these functions?
    
    The performance issue I see is that we allocate new arrays for each element in `updateExpressions`. This could produce bulky garbage and the resulting GC could significantly slow things down. This is the performance issue I wanted to measure before writing a ArrayListODT to reuse lists. I think this would be a good next step once the core `collect_set` and `collect_list` functionality is merged.
    
    I'm certainly not a Spark SQL expert, so I'm not sure I fully understand your concerns or ideas for performance improvement. Could the operator-based collect_* produce the same array in a single row results as the aggregation-based collect_* functions while preventing OOM issues on the worker nodes?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-137935643
  
    @nburoojy I think when @yhuai filed the ticket, he wanted to implement this for the new aggregate interface, not the old one (since it already exists on the old one).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by nburoojy <gi...@git.apache.org>.
Github user nburoojy commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-138091105
  
    That makes sense. To be clear, the new interface is `AlgebraicAggregate` or, if a lower-level interface is needed `AggregateFunction2`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-140230655
  
    @nburoojy This will work, but the performance will be pretty bad, and very prone to GC.
    
    I think to properly fix this, we might want to rewrite the query under the hood to implement it using an operator that gives a list of items, instead of an aggregate function.
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8592#discussion_r44086478
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala ---
    @@ -162,6 +162,8 @@ package object dsl {
         def stddev(e: Expression): Expression = Stddev(e)
         def stddev_pop(e: Expression): Expression = StddevPop(e)
         def stddev_samp(e: Expression): Expression = StddevSamp(e)
    +    def collect_list(e: Expression): Expression = CollectList(e)
    +    def collect_set(e: Expression): Expression = CollectSet(e)
    --- End diff --
    
    This is really only for easy building of test cases.  To make it user facing, just add it to [fuctions.py](https://github.com/apache/spark/blob/master/python/pyspark/sql/functions.py) and [functions.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-154513739
  
    The problem is we are deleting the old non-memory managed aggregation codepath.  Its simpler but leave the user open to OOMs.  We would need to add some way to append to array in tungsten encoded rows or something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by nburoojy <gi...@git.apache.org>.
Github user nburoojy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8592#discussion_r44176861
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala ---
    @@ -227,6 +227,38 @@ case class GetArrayItem(child: Expression, ordinal: Expression)
     }
     
     /**
    + * Combines two Arrays into one Array.
    + */
    +case class ArrayUnion(left: Expression, right: Expression) extends BinaryOperator {
    +
    +  override def inputType: AbstractDataType = ArrayType
    +
    +  override def symbol: String = "++"
    +
    +  private def inputArrType = left.dataType.asInstanceOf[ArrayType]
    +  override def dataType: DataType = inputArrType
    +
    +  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
    +    val arrayClass = classOf[GenericArrayData].getName
    +    val elementType = inputArrType.elementType
    +    nullSafeCodeGen(ctx, ev, (eval1, eval2) => {
    +      s"""
    +        final int n1 = $eval1.numElements();
    +        final int n2 = $eval2.numElements();
    +        final Object[] unionValues = new Object[n1 + n2];
    +        for (int j = 0; j < n1; j++) {
    +          unionValues[j] = ${ctx.getValue(eval1, elementType, "j")};
    +        }
    +        for (int j = 0; j < n2; j++) {
    +          unionValues[n1 + j] = ${ctx.getValue(eval2, elementType, "j")};
    +        }
    +        ${ev.primitive} = new $arrayClass(unionValues);
    +      """
    +    })
    +  }
    --- End diff --
    
    You're absolutely right; since inserting an element into an array requires copying the entire array, the running time is `O(n^2)` in the number of elements in the destination array. N.B.: `n` is not necessarily the number of elements in the source data; e.g. if the user groups by `id`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8592#discussion_r44087217
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala ---
    @@ -227,6 +227,38 @@ case class GetArrayItem(child: Expression, ordinal: Expression)
     }
     
     /**
    + * Combines two Arrays into one Array.
    + */
    +case class ArrayUnion(left: Expression, right: Expression) extends BinaryOperator {
    +
    +  override def inputType: AbstractDataType = ArrayType
    +
    +  override def symbol: String = "++"
    +
    +  private def inputArrType = left.dataType.asInstanceOf[ArrayType]
    +  override def dataType: DataType = inputArrType
    +
    +  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
    +    val arrayClass = classOf[GenericArrayData].getName
    +    val elementType = inputArrType.elementType
    +    nullSafeCodeGen(ctx, ev, (eval1, eval2) => {
    +      s"""
    +        final int n1 = $eval1.numElements();
    +        final int n2 = $eval2.numElements();
    +        final Object[] unionValues = new Object[n1 + n2];
    +        for (int j = 0; j < n1; j++) {
    +          unionValues[j] = ${ctx.getValue(eval1, elementType, "j")};
    +        }
    +        for (int j = 0; j < n2; j++) {
    +          unionValues[n1 + j] = ${ctx.getValue(eval2, elementType, "j")};
    +        }
    +        ${ev.primitive} = new $arrayClass(unionValues);
    +      """
    +    })
    +  }
    --- End diff --
    
    I'm worried that this is `O(n^2)` in the number of elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by nburoojy <gi...@git.apache.org>.
Github user nburoojy commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-154511621
  
    Thanks for the review @marmbrus !
    
    I've sent https://github.com/apache/spark/pull/9526 with your suggestion to alias the Hive UDAFs. And I'd like to include it in the 1.6 release.
    
    Longer-term (beyond 1.6) I'd like to solve the core issue.
    For my particular use case I would like the ability to aggregate compound types (struct and array), and it appears Hive 0.13.0 does not support this.
    
    What kind of major changes would we have to make to support O(1) array insertion? I was thinking that a strategy like [CollectHashSet](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala) uses would also work here; that is, I would implement a `CompactBufferUDT` (backed by [CompactBuffer](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala)), and `updateExpressions` would append to the buffer in amortized `O(1)`.
    
    Would this strategy break assumptions in the new aggregation framework? Do you think this change is larger than I expect?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by nburoojy <gi...@git.apache.org>.
Github user nburoojy closed the pull request at:

    https://github.com/apache/spark/pull/8592


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by nburoojy <gi...@git.apache.org>.
Github user nburoojy commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-140238919
  
    @rxin I bet you're right about the performance and gc issue. Is there a performance test suite to quantify this issue? I always hesitate to optimize prematurely :-)
    
    > rewrite the query under the hood to implement it using an operator that gives a list of items
    
    Are you suggesting to create a `List` equivalent of `OpenHashSetUDT`? Mutating a single buffer on `updateExpressions` and `mergeExpressions` would certainly cut down on the number of Object instantiations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by justinuang <gi...@git.apache.org>.
Github user justinuang commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-152189895
  
    @rxin I would really like to see this merged in as well. I agree with @nburoojy that collect_as_list and collect_as_set will always have issues if the size of one list/set gets too big, but that doesn't mean that this isn't useful. I didn't quite follow the other parts about the API refactor, but if that isn't a huge issue, it would be nice to merge something like this in soon.
    
    If people are careful, this can be extremely useful, especially when it's impossible to write a UDAF that has a mergeValue and mergeCombiner function, or when you just want to restructure how the table is layed-out. In addition, until pyspark gets UDAFs, this will be a good substitute for most cases. Right now, to get around this in pyspark, I'm using hive's collect_list, but it's annoying because I have to register a temp table and use a SQL query instead of the dataframe API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by maver1ck <gi...@git.apache.org>.
Github user maver1ck commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-154037876
  
    I agree that this could be useful in this form.
    
    Maybe we can merge this into 1.6.0 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-154247089
  
    Thanks for working on this!  I would like to include this, but I'm worried about the performance.  Unfortunately, I think that in order to get something that is not n^2 we are going to have to make major changes to aggregation (and its after code freeze already).
    
    Fortunately, we already have a hive UDF that does this in `O(n)`.  I'd propose that we just add a wrapper so that people can find this in the functions list for DataFrames.  You can just make the function call: `callUDF("collect_set", ...)`, etc.
    
    If you can make a PR in the next day or two we can still include it in Spark 1.6.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9301] [SQL] Add collect_set aggregate f...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/8592#issuecomment-140244709
  
    This is not premature optimization. Just run collect_set on a large set and Spark JVM processes will OOM.
    
    What I'm suggesting is to implement this not as an aggregate function, but rather an operator that does the function of collect_set (e.g. the operator can sort all the data by grouping key and set key to compute the set for each group). 
    
    Anyway - maybe it'd make sense also have the aggregate function version of this, with the caveat written correctly that this would OOM if running on large amount of data.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org