You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2018/08/08 15:15:20 UTC
datafu git commit: Test for spark dedup and joins
Repository: datafu
Updated Branches:
refs/heads/spark-tmp 34dfddef5 -> fd5868db5
Test for spark dedup and joins
Project: http://git-wip-us.apache.org/repos/asf/datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/datafu/commit/fd5868db
Tree: http://git-wip-us.apache.org/repos/asf/datafu/tree/fd5868db
Diff: http://git-wip-us.apache.org/repos/asf/datafu/diff/fd5868db
Branch: refs/heads/spark-tmp
Commit: fd5868db5364519013b79aaa9dcbbf1592719d08
Parents: 34dfdde
Author: Eyal Allweil <ey...@apache.org>
Authored: Wed Aug 8 18:12:34 2018 +0300
Committer: Eyal Allweil <ey...@apache.org>
Committed: Wed Aug 8 18:12:34 2018 +0300
----------------------------------------------------------------------
.../scala/datafu/spark/TestSparkDFUtils.scala | 217 +++++++++++++++++++
1 file changed, 217 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/datafu/blob/fd5868db/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
----------------------------------------------------------------------
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
new file mode 100644
index 0000000..961c6d5
--- /dev/null
+++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.spark
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.SparkSession
+
+import org.apache.spark.sql.functions._
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+
+import org.scalatest.FunSuite
+
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import scala.collection.mutable.WrappedArray
+
+import org.apache.spark.sql.types._
+
+@RunWith(classOf[JUnitRunner])
+class DedupTests extends FunSuite with DataFrameSuiteBase {
+
+ import DataFrameOps._
+
+ import spark.implicits._
+
+ case class exp4(col2: String, col_grp:String, col_ord:Option[Int], col_str:String)
+
+ case class exp3(col_grp:String, col_ord:Option[Int], col_str:String)
+
+ case class exp2(col_grp:String, col_ord:Option[Int])
+
+ case class exp2n(col_grp:String, col_ord:Int)
+
+ //var df : DataFrame = spark.read.format("csv").option("header", "true").load("src/test/resources/dedup.csv")
+
+ lazy val inputDataFrame = sc.parallelize(List(("a", 1, "asd1"), ("a", 2, "asd2"), ("a", 3, "asd3"), ("b", 1, "asd4"))).toDF("col_grp", "col_ord", "col_str").cache
+
+ test("dedup") {
+ val expected : DataFrame = sc.parallelize(List(("b",1),("a",3))).toDF("col_grp", "col_ord")
+
+ assertDataFrameEquals(expected, inputDataFrame.dedup($"col_grp", $"col_ord".desc).select($"col_grp", $"col_ord"))
+ }
+
+ test("dedup2_by_int") {
+
+ val expectedByIntDf : DataFrame = sqlContext.createDataFrame(List(exp4("asd4","b",Option(1),"asd4"),exp4("asd1","a",Option(3),"asd3")))
+
+ val actual = inputDataFrame.dedup2($"col_grp", $"col_ord", moreAggFunctions = Seq(min($"col_str")))
+
+ assertDataFrameEquals(expectedByIntDf, actual)
+ }
+
+ test("dedup2_by_string_asc") {
+
+ val actual = inputDataFrame.dedup2($"col_grp", $"col_str", desc = false)
+
+ val expectedByStringDf : DataFrame = sqlContext.createDataFrame(List(exp3("b",Option(1),"asd4"),exp3("a",Option(1),"asd1")))
+
+ assertDataFrameEquals(expectedByStringDf, actual)
+ }
+
+ test("test_dedup2_by_complex_column") {
+
+ val actual = inputDataFrame.dedup2($"col_grp", expr("cast(concat('-',col_ord) as int)"), desc = false)
+
+ val expectedComplex : DataFrame = sqlContext.createDataFrame(List(exp3("b",Option(1),"asd4"),exp3("a",Option(3),"asd3")))
+
+ assertDataFrameEquals(expectedComplex, actual)
+ }
+
+ case class inner(col_grp:String, col_ord:Int)
+
+ case class expComplex(
+ col_grp: String,
+ col_ord: Option[Int],
+ col_str: String,
+ arr_col: Array[String],
+ struct_col: inner,
+ map_col: Map[String, Int]
+ )
+
+ test("test_dedup2_with_other_complex_column") {
+
+ val actual = inputDataFrame.withColumn("arr_col", expr("array(col_grp, col_ord)"))
+ .withColumn("struct_col", expr("struct(col_grp, col_ord)"))
+ .withColumn("map_col", expr("map(col_grp, col_ord)"))
+ .withColumn("map_col_blah", expr("map(col_grp, col_ord)"))
+ .dedup2($"col_grp", expr("cast(concat('-',col_ord) as int)"))
+ .drop("map_col_blah")
+
+ val expected : DataFrame = sqlContext.createDataFrame(List(
+ expComplex("b", Option(1), "asd4", Array("b","1"), inner("b",1), Map("b" -> 1)),
+ expComplex("a", Option(1), "asd1", Array("a","1"), inner("a",1), Map("a" -> 1))
+ ))
+
+// compare(expected, actual)
+
+ assertDataFrameEquals(expected, actual)
+ }
+
+ test("test_dedup_top_n") {
+ val actual = inputDataFrame.dedupTopN(2, $"col_grp", $"col_ord".desc).select($"col_grp", $"col_ord")
+
+ val expected = sqlContext.createDataFrame(List(exp2n("b",1), exp2n("a",3), exp2n("a",2)))
+
+ assertDataFrameEquals(expected, actual)
+ }
+
+ case class expRangeJoin(col_grp:String, col_ord:Option[Int], col_str:String, start:Option[Int], end:Option[Int], desc:String)
+
+ test("join_with_range") {
+ val df = sc.parallelize(List(("a", 1, "asd1"), ("a", 2, "asd2"), ("a", 3, "asd3"), ("b", 1, "asd4"))).toDF("col_grp", "col_ord", "col_str")
+ val dfr = sc.parallelize(List((1, 2,"asd1"), (1, 4, "asd2"), (3, 5,"asd3"), (3, 10,"asd4"))).toDF("start", "end", "desc")
+
+ val expected = sqlContext.createDataFrame(List(
+ expRangeJoin("b",Option(1),"asd4",Option(1),Option(2),"asd1"),
+ expRangeJoin("a",Option(3),"asd3",Option(3),Option(5),"asd3"),
+ expRangeJoin("a",Option(2),"asd2",Option(1),Option(2),"asd1")
+
+ ))
+
+ val actual = df.joinWithRange("col_ord", dfr, "start", "end")
+
+ assertDataFrameEquals(expected, actual)
+ }
+
+ test("broadcastJoinSkewed") {
+ val skewedList = List(("1", "a"), ("1", "b"), ("1", "c"), ("1", "d"), ("1", "e"),("2", "k"),("0", "k"))
+ val skewed = sqlContext.createDataFrame(skewedList).toDF("key", "val_skewed")
+ val notSkewed = sqlContext.createDataFrame((1 to 10).map(i => (i.toString, s"str$i"))).toDF("key", "val")
+
+ val expected = sqlContext.createDataFrame(List(
+ ("2","str2", "k"),
+ ("1","str1", "e"),
+ ("1","str1", "d"),
+ ("1","str1", "c"),
+ ("1","str1", "b"),
+ ("1","str1", "a")
+ )).toDF("key","val","val_skewed")
+
+ val actual1 = notSkewed.broadcastJoinSkewed(skewed,"key", 1)
+
+ assertDataFrameEquals(expected, actual1)
+
+ val actual2 = notSkewed.broadcastJoinSkewed(skewed,"key", 2)
+
+ assertDataFrameEquals(expected, actual2)
+ }
+
+ // because of nulls in expected data, an actual schema needs to be used
+ case class expJoinSkewed(str1:String, str2:String, str3:String, str4:String )
+
+ test("joinSkewed") {
+ val skewedList = List(("1", "a"), ("1", "b"), ("1", "c"), ("1", "d"), ("1", "e"),("2", "k"),("0", "k"))
+ val skewed = sqlContext.createDataFrame(skewedList).toDF("key", "val_skewed")
+ val notSkewed = sqlContext.createDataFrame((1 to 10).map(i => (i.toString, s"str$i"))).toDF("key", "val")
+
+ val actual1 = skewed.as("a").joinSkewed(notSkewed.as("b"),expr("a.key = b.key"), 3)
+
+ val expected1 = sqlContext.createDataFrame(List(
+ ("1","a","1","str1"),
+ ("1","b","1","str1"),
+ ("1","c","1","str1"),
+ ("1","d","1","str1"),
+ ("1","e","1","str1"),
+ ("2","k","2","str2")
+ )).toDF("key","val_skewed","key","val")
+
+ assertDataFrameEquals(expected1, actual1)
+
+ val actual2 = skewed.as("a").joinSkewed(notSkewed.as("b"),expr("a.key = b.key"), 3, "left_outer")
+
+ val expected2 = sqlContext.createDataFrame(List(
+ expJoinSkewed("1","a","1","str1"),
+ expJoinSkewed("1","b","1","str1"),
+ expJoinSkewed("1","c","1","str1"),
+ expJoinSkewed("1","d","1","str1"),
+ expJoinSkewed("1","e","1","str1"),
+ expJoinSkewed("2","k","2","str2"),
+ expJoinSkewed("0","k",null,null)
+ )).toDF("key","val_skewed","key","val")
+
+ assertDataFrameEquals(expected2, actual2)
+ }
+
+ def compare(expected: DataFrame, actual: DataFrame) = {
+ println ("expected: " + expected.schema)
+
+ expected.show
+
+ println ("actual: " + actual.schema)
+
+ actual.show
+
+ assert("Schemas not equal!!!", expected.schema, actual.schema)
+
+ assertDataFrameEquals(expected, actual)
+ }
+}
\ No newline at end of file