You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2019/02/26 12:13:15 UTC

[datafu] 01/02: Add test for CountDistinctUpTo

This is an automated email from the ASF dual-hosted git repository.

eyal pushed a commit to branch spark-tmp
in repository https://gitbox.apache.org/repos/asf/datafu.git

commit 72340c8369cd72a3696365074cbf55b0f3002935
Author: Eyal Allweil <ey...@apache.org>
AuthorDate: Tue Feb 26 12:37:22 2019 +0200

    Add test for CountDistinctUpTo
---
 .../test/scala/datafu/spark/TestSparkUDAFs.scala   | 33 ++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkUDAFs.scala b/datafu-spark/src/test/scala/datafu/spark/TestSparkUDAFs.scala
index 0b77176..12f32eb 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestSparkUDAFs.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkUDAFs.scala
@@ -149,4 +149,37 @@ class UdafTests extends FunSuite with DataFrameSuiteBase {
       df.withColumn("asd", SparkOverwriteUDAFs.minValueByKey($"col_ord", $"col_str").over(Window.partitionBy("col_grp"))))
   }
 
+  case class exp5(col_grp: String, col_ord: Option[Int])
+  
+  test("countDistinctUpTo") {
+    import datafu.spark.SparkUDAFs.CountDistinctUpTo
+    
+    val countDistinctUpTo3 = new CountDistinctUpTo(3)
+    val countDistinctUpTo6 = new CountDistinctUpTo(6)
+    
+    val inputDF = sqlContext.createDataFrame(List(
+        exp5("b", Option(1)),
+        exp5("a", Option(1)),
+        exp5("a", Option(2)),
+        exp5("a", Option(3)),
+        exp5("a", Option(4))
+      ))
+
+    val results3DF = sqlContext.createDataFrame(List(
+        exp5("b", Option(1)),
+        exp5("a", Option(3))
+    ))
+
+    val results6DF = sqlContext.createDataFrame(List(
+        exp5("b", Option(1)),
+        exp5("a", Option(4))
+    ))
+    
+    inputDF.groupBy("col_grp").agg(countDistinctUpTo3($"col_ord").as("col_ord")).show
+    
+    assertDataFrameEquals(results3DF, inputDF.groupBy("col_grp").agg(countDistinctUpTo3($"col_ord").as("col_ord")))
+    
+    assertDataFrameEquals(results6DF, inputDF.groupBy("col_grp").agg(countDistinctUpTo6($"col_ord").as("col_ord")))
+  }
+  
 }