You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2019/01/14 13:18:43 UTC

datafu git commit: added documentation to UDAFs

Repository: datafu
Updated Branches:
  refs/heads/spark-tmp a6d3c5504 -> f6d40474a


added documentation to UDAFs

Signed-off-by: Eyal Allweil <ey...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/datafu/commit/f6d40474
Tree: http://git-wip-us.apache.org/repos/asf/datafu/tree/f6d40474
Diff: http://git-wip-us.apache.org/repos/asf/datafu/diff/f6d40474

Branch: refs/heads/spark-tmp
Commit: f6d40474ae83a7701b2fa50f4a7966b0753d0041
Parents: a6d3c55
Author: Ohad Raviv <or...@paypal.com>
Authored: Mon Jan 14 15:16:52 2019 +0200
Committer: Eyal Allweil <ey...@apache.org>
Committed: Mon Jan 14 15:18:18 2019 +0200

----------------------------------------------------------------------
 .gitignore                                      |   2 +
 .../main/scala/datafu/spark/SparkDFUtils.scala  | 108 +++++++++++--------
 .../main/scala/datafu/spark/SparkUDAFs.scala    |  12 ++-
 3 files changed, 75 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/datafu/blob/f6d40474/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index a4e88b5..96aeaa0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,5 +32,7 @@ datafu-pig/query
 gradle/wrapper/gradle-wrapper.jar
 gradle/wrapper/gradle-wrapper.properties
 gradlew*
+datafu-scala-spark/out
+datafu-scala-spark/derby.log
 datafu-spark/spark-warehouse
 datafu-spark/metastore_db

http://git-wip-us.apache.org/repos/asf/datafu/blob/f6d40474/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
----------------------------------------------------------------------
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
index a71ec32..f22a13f 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
@@ -32,7 +32,9 @@ import org.apache.spark.util.SizeEstimator
 object SparkDFUtils extends SparkDFUtilsTrait {
 
   /**
-    * Used get the 'latest' record (after ordering according to the provided order columns) in each group.
+    * Used to get the 'latest' record (after ordering according to the provided order columns) in each group.
+    * Different from {@link org.apache.spark.sql.Dataset#dropDuplicates} because order matters.
+    *
     * @param df DataFrame to operate on
     * @param groupCol column to group by the records
     * @param orderCols columns to order the records according to
@@ -52,13 +54,13 @@ object SparkDFUtils extends SparkDFUtilsTrait {
     */
   override def dedupTopN(df: DataFrame, n: Int, groupCol: Column, orderCols: Column*): DataFrame = {
     val w = Window.partitionBy(groupCol).orderBy(orderCols: _*)
-    val ss = df.sparkSession
-    import ss.implicits._
-    df.withColumn("rn", row_number.over(w)).where($"rn" <= n).drop("rn")
+    df.withColumn("rn", row_number.over(w)).where(col("rn") <= n).drop("rn")
   }
 
   /**
-    * Used get the 'latest' record (after ordering according to the provided order columns) in each group.
+    * Used to get the 'latest' record (after ordering according to the provided order columns) in each group.
+    * the same functionality as {@link #dedup} but implemented using UDAF to utilize map side aggregation.
+    *
     * @param df DataFrame to operate on
     * @param groupCol column to group by the records
     * @param orderByCol column to order the records according to
@@ -96,11 +98,25 @@ object SparkDFUtils extends SparkDFUtilsTrait {
  /**
   * Returns a DataFrame with the given column (should be a StructType) replaced by its inner fields.
   * This method only flattens a single level of nesting.
+  *
+  * +-------+----------+----------+----------+
+  * |id     |s.sub_col1|s.sub_col2|s.sub_col3|
+  * +-------+----------+----------+----------+
+  * |123    |1         |2         |3         |
+  * +-------+----------+----------+----------+
+  *
+  * +-------+----------+----------+----------+
+  * |id     |sub_col1  |sub_col2  |sub_col3  |
+  * +-------+----------+----------+----------+
+  * |123    |1         |2         |3         |
+  * +-------+----------+----------+----------+
+  *
   * @param df DataFrame to operate on
   * @param colName column name for a column of type StructType
   * @return DataFrame representing the data after the operation
   */
   override def flatten(df: DataFrame, colName: String): DataFrame = {
+    assert(df.schema(colName).dataType.isInstanceOf[StructType], s"Column $colName must be of type Struct")
     val outerFields = df.schema.fields.map(_.name).toSet
     val flattenFields = df.schema(colName).dataType.asInstanceOf[StructType].fields.filter(f => !outerFields.contains(f.name)).map("`" + colName + "`.`" + _.name + "`")
     df.selectExpr("*" +: flattenFields: _*).drop(colName)
@@ -116,18 +132,19 @@ object SparkDFUtils extends SparkDFUtilsTrait {
     df.select(df.columns.zip(newScheme).map {case (oldCol: String, newCol: String) => col(oldCol).as(newCol)}: _*)
 
   /**
-    * Used to perform a join when the right df is relatively small but doesn't fit to perform broadcast join.
+    * Used to perform a join when the right df is relatively small but still too big to fit in memory to perform map side broadcast join.
     * Use cases:
-    * a. excluding keys that might be skew from a medium size list.
-    * b. join a big skewed table with a table that has small number of very big rows.
+    * a. excluding keys that might be skewed from a medium size list.
+    * b. join a big skewed table with a table that has small number of very large rows.
+    *
     * @param dfLeft left DataFrame
     * @param dfRight right DataFrame
     * @param joinExprs join expression
-    * @param numShards number of shards
+    * @param numShards number of shards - number of times to duplicate the right DataFrame
     * @param joinType join type
-    * @return DataFrame representing the data after the operation
+    * @return joined DataFrame
     */
-  override def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int = 30, joinType: String = "inner"): DataFrame = {
+  override def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int = 10, joinType: String = "inner"): DataFrame = {
     // skew join based on salting
     // salts the left DF by adding another random column and join with the right DF after duplicating it
     val ss = dfLeft.sparkSession
@@ -144,13 +161,13 @@ object SparkDFUtils extends SparkDFUtilsTrait {
     * @param notSkewed not skewed DataFrame
     * @param skewed skewed DataFrame
     * @param joinCol join column
-    * @param numberCustsToBroadcast num of custs to broadcast
+    * @param numRowsToBroadcast num of rows to broadcast
     * @return DataFrame representing the data after the operation
     */
-  override def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numberCustsToBroadcast: Int): DataFrame = {
+  override def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numRowsToBroadcast: Int): DataFrame = {
     val ss = notSkewed.sparkSession
     import ss.implicits._
-    val skewedKeys = skewed.groupBy(joinCol).count().sort($"count".desc).limit(numberCustsToBroadcast).drop("count")
+    val skewedKeys = skewed.groupBy(joinCol).count().sort($"count".desc).limit(numRowsToBroadcast).drop("count")
                            .withColumnRenamed(joinCol, "skew_join_key").cache()
 
     val notSkewedWithSkewIndicator = notSkewed.join(broadcast(skewedKeys), $"skew_join_key" === col(joinCol), "left")
@@ -170,48 +187,47 @@ object SparkDFUtils extends SparkDFUtilsTrait {
   }
 
   /**
-    * Helper function to join a table with column to a table with range of the same column.
-    * For example, ip table with whois data that has range of ips as lines.
-    * The main problem which this handles is doing naive explode on the range can result in huge table.
+    * Helper function to join a table with point column to a table with range column.
+    * For example, join a table that contains specific time in minutes with a table that contains time ranges.
+    * The main problem this function addresses is that doing naive explode on the ranges can result in a huge table.
     * requires:
-    * 1. single table needs to be distinct on the join column, because there could be a few corresponding ranges so we dedup at the end - we choose the minimal range.
-    * 2. the range and single columns to be numeric.
+    * 1. point table needs to be distinct on the point column. there could be a few corresponding ranges to each point,
+    *    so we choose the minimal range.
+    * 2. the range and point columns need to be numeric.
     *
-    * IP:
-    * +-------+---------+
-    * |ip     |ip_val   |
-    * +-------+---------+
-    * |123    |0.0.0.123|
-    * +-------+---------+
+    * TIMES:
+    * +-------+
+    * |time   |
+    * +-------+
+    * |11:55  |
+    * +-------+
     *
-    * IP RANGES:
-    * +---------+---------+----------+
-    * |ip_start |ip_end   |desc      |
-    * +---------+---------+----------+
-    * |100      |200      | nba1.com |
-    * +---------+---------+----------+
-    * |50       |300      | nba2.com |
-    * +---------+---------+----------+
+    * TIME RANGES:
+    * +----------+---------+----------+
+    * |start_time|end_time |desc      |
+    * +----------+---------+----------+
+    * |10:00     |12:00    | meeting  |
+    * +----------+---------+----------+
+    * |11:50     |12:15    | lunch    |
+    * +----------+---------+----------+
     *
     * OUTPUT:
-    * +---------+---------+---------+---------+----------+
-    * |ip       |ip_val   |ip_start |ip_end   |desc      |
-    * +---------+---------+---------+---------+----------+
-    * |123      |0.0.0.123|100      |200      | nba1.com |
-    * +---------+---------+---------+---------+----------+
-    *
+    * +-------+----------+---------+-------+
+    * |time   |start_time|end_time |desc   |
+    * +-------+----------+---------+-------+
+    * |11:55  |11:50     |12:15    | lunch |
+    * +-------+----------+---------+-------+
     *
-    * @param dfSingle
-    * @param colSingle
-    * @param dfRange
-    * @param colRangeStart
-    * @param colRangeEnd
-    * @param DECREASE_FACTOR
+    * @param dfSingle - DataFrame that contains the point column
+    * @param colSingle - the point column's name
+    * @param dfRange - DataFrame that contains the range column
+    * @param colRangeStart - the start range column's name
+    * @param colRangeEnd - the end range column's name
+    * @param DECREASE_FACTOR - resolution factor. instead of exploding the range column directly, we first decrease its resolution by this factor
     * @return
     */
   override def joinWithRange(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame = {
 
-    //sqlContext.udf.register("range", (start: Int, end: Int) => (start to end).toArray)
     import org.apache.spark.sql.functions.udf
     val rangeUDF = udf((start: Long, end: Long) => (start to end).toArray)
     val dfRange_exploded = dfRange.withColumn("range_start", col(colRangeStart).cast(LongType))

http://git-wip-us.apache.org/repos/asf/datafu/blob/f6d40474/datafu-spark/src/main/scala/datafu/spark/SparkUDAFs.scala
----------------------------------------------------------------------
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkUDAFs.scala b/datafu-spark/src/main/scala/datafu/spark/SparkUDAFs.scala
index 3c4782f..fe95ffb 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkUDAFs.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkUDAFs.scala
@@ -28,6 +28,10 @@ import scala.collection.{Map, mutable}
 
 object SparkUDAFs {
 
+  /**
+    * Like Google's MultiSets.
+    * Aggregate function that creates a map of key to its count.
+    */
   class MultiSet() extends UserDefinedAggregateFunction {
 
     def inputSchema: StructType = new StructType().add("key", StringType)
@@ -67,6 +71,10 @@ object SparkUDAFs {
 
   }
 
+  /**
+    * Essentially the same as MultiSet, but gets an Array for input.
+    * There is an extra option to limit the number of keys (like CountDistinctUpTo)
+    */
   class MultiArraySet[T : Ordering](dt : DataType = StringType, maxKeys: Int = -1) extends UserDefinedAggregateFunction {
 
     def inputSchema: StructType = new StructType().add("key", ArrayType(dt))
@@ -129,7 +137,9 @@ object SparkUDAFs {
 
   }
 
-  // Merge maps of kind string -> set<string>
+  /**
+    * Merge maps of kind string -> set<string>
+    */
   class MapSetMerge extends UserDefinedAggregateFunction {
 
     def inputSchema: StructType = new StructType().add("key", dataType)