You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/02/15 00:54:52 UTC

[spark] branch master updated: [SPARK-26861][SQL] deprecate typed sum/count/average

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8656af9  [SPARK-26861][SQL] deprecate typed sum/count/average
8656af9 is described below

commit 8656af98c000877a815c38df8827af8e9981bcac
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Feb 14 16:54:39 2019 -0800

    [SPARK-26861][SQL] deprecate typed sum/count/average
    
    ## What changes were proposed in this pull request?
    
    These builtin typed aggregate functions are not very useful:
    1. users can just call the untyped ones and turn the resulting dataframe to a dataset. It has better performance.
    2. the typed aggregate functions have subtle different behaviors regarding empty input.
    
    I think we should get rid of these builtin typed agg functions and suggest users to use the untyped ones.
    
    However, these functions are still useful as a demo of the `Aggregator` API, so I copied them to the example module.
    
    ## How was this patch tested?
    
    N/A
    
    Closes #23763 from cloud-fan/example.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../spark/examples/sql/SimpleTypedAggregator.scala | 86 ++++++++++++++++++++++
 .../spark/sql/expressions/javalang/typed.java      |  6 +-
 .../spark/sql/expressions/scalalang/typed.scala    | 15 +---
 3 files changed, 89 insertions(+), 18 deletions(-)

diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SimpleTypedAggregator.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SimpleTypedAggregator.scala
new file mode 100644
index 0000000..f8af919
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SimpleTypedAggregator.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.sql
+
+import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
+import org.apache.spark.sql.expressions.Aggregator
+
+// scalastyle:off println
+object SimpleTypedAggregator {
+
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder
+      .master("local")
+      .appName("common typed aggregator implementations")
+      .getOrCreate()
+
+    import spark.implicits._
+    val ds = spark.range(20).select(('id % 3).as("key"), 'id).as[(Long, Long)]
+    println("input data:")
+    ds.show()
+
+    println("running typed sum:")
+    ds.groupByKey(_._1).agg(new TypedSum[(Long, Long)](_._2).toColumn).show()
+
+    println("running typed count:")
+    ds.groupByKey(_._1).agg(new TypedCount[(Long, Long)](_._2).toColumn).show()
+
+    println("running typed average:")
+    ds.groupByKey(_._1).agg(new TypedAverage[(Long, Long)](_._2.toDouble).toColumn).show()
+
+    spark.stop()
+  }
+}
+// scalastyle:on println
+
+class TypedSum[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
+  override def zero: Long = 0L
+  override def reduce(b: Long, a: IN): Long = b + f(a)
+  override def merge(b1: Long, b2: Long): Long = b1 + b2
+  override def finish(reduction: Long): Long = reduction
+
+  override def bufferEncoder: Encoder[Long] = Encoders.scalaLong
+  override def outputEncoder: Encoder[Long] = Encoders.scalaLong
+}
+
+class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
+  override def zero: Long = 0
+  override def reduce(b: Long, a: IN): Long = {
+    if (f(a) == null) b else b + 1
+  }
+  override def merge(b1: Long, b2: Long): Long = b1 + b2
+  override def finish(reduction: Long): Long = reduction
+
+  override def bufferEncoder: Encoder[Long] = Encoders.scalaLong
+  override def outputEncoder: Encoder[Long] = Encoders.scalaLong
+}
+
+class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
+  override def zero: (Double, Long) = (0.0, 0L)
+  override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
+  override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2
+  override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = {
+    (b1._1 + b2._1, b1._2 + b2._2)
+  }
+
+  override def bufferEncoder: Encoder[(Double, Long)] = {
+    Encoders.tuple(Encoders.scalaDouble, Encoders.scalaLong)
+  }
+  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
index 5a72f0c..859b936 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.expressions.javalang;
 
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.annotation.Experimental;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.TypedColumn;
 import org.apache.spark.sql.execution.aggregate.TypedAverage;
@@ -33,9 +31,9 @@ import org.apache.spark.sql.execution.aggregate.TypedSumLong;
  * Scala users should use {@link org.apache.spark.sql.expressions.scalalang.typed}.
  *
  * @since 2.0.0
+ * @deprecated As of release 3.0.0, please use the untyped builtin aggregate functions.
  */
-@Experimental
-@Evolving
+@Deprecated
 public class typed {
   // Note: make sure to keep in sync with typed.scala
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
index 1cb579c..da7ed69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.expressions.scalalang
 
-import org.apache.spark.annotation.{Evolving, Experimental}
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.aggregate._
 
@@ -29,8 +28,7 @@ import org.apache.spark.sql.execution.aggregate._
  *
  * @since 2.0.0
  */
-@Experimental
-@Evolving
+@deprecated("please use untyped builtin aggregate functions.", "3.0.0")
 // scalastyle:off
 object typed {
   // scalastyle:on
@@ -76,15 +74,4 @@ object typed {
    * @since 2.0.0
    */
   def sumLong[IN](f: IN => Long): TypedColumn[IN, Long] = new TypedSumLong[IN](f).toColumn
-
-  // TODO:
-  // stddevOf: Double
-  // varianceOf: Double
-  // approx_count_distinct: Long
-
-  // minOf: T
-  // maxOf: T
-
-  // firstOf: T
-  // lastOf: T
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org