You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2019/01/14 13:18:43 UTC
datafu git commit: added documentation to UDAFs
Repository: datafu
Updated Branches:
refs/heads/spark-tmp a6d3c5504 -> f6d40474a
added documentation to UDAFs
Signed-off-by: Eyal Allweil <ey...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/datafu/commit/f6d40474
Tree: http://git-wip-us.apache.org/repos/asf/datafu/tree/f6d40474
Diff: http://git-wip-us.apache.org/repos/asf/datafu/diff/f6d40474
Branch: refs/heads/spark-tmp
Commit: f6d40474ae83a7701b2fa50f4a7966b0753d0041
Parents: a6d3c55
Author: Ohad Raviv <or...@paypal.com>
Authored: Mon Jan 14 15:16:52 2019 +0200
Committer: Eyal Allweil <ey...@apache.org>
Committed: Mon Jan 14 15:18:18 2019 +0200
----------------------------------------------------------------------
.gitignore | 2 +
.../main/scala/datafu/spark/SparkDFUtils.scala | 108 +++++++++++--------
.../main/scala/datafu/spark/SparkUDAFs.scala | 12 ++-
3 files changed, 75 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/datafu/blob/f6d40474/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index a4e88b5..96aeaa0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,5 +32,7 @@ datafu-pig/query
gradle/wrapper/gradle-wrapper.jar
gradle/wrapper/gradle-wrapper.properties
gradlew*
+datafu-scala-spark/out
+datafu-scala-spark/derby.log
datafu-spark/spark-warehouse
datafu-spark/metastore_db
http://git-wip-us.apache.org/repos/asf/datafu/blob/f6d40474/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
----------------------------------------------------------------------
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
index a71ec32..f22a13f 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
@@ -32,7 +32,9 @@ import org.apache.spark.util.SizeEstimator
object SparkDFUtils extends SparkDFUtilsTrait {
/**
- * Used get the 'latest' record (after ordering according to the provided order columns) in each group.
+ * Used to get the 'latest' record (after ordering according to the provided order columns) in each group.
+ * Different from {@link org.apache.spark.sql.Dataset#dropDuplicates} because order matters.
+ *
* @param df DataFrame to operate on
* @param groupCol column to group by the records
* @param orderCols columns to order the records according to
@@ -52,13 +54,13 @@ object SparkDFUtils extends SparkDFUtilsTrait {
*/
override def dedupTopN(df: DataFrame, n: Int, groupCol: Column, orderCols: Column*): DataFrame = {
val w = Window.partitionBy(groupCol).orderBy(orderCols: _*)
- val ss = df.sparkSession
- import ss.implicits._
- df.withColumn("rn", row_number.over(w)).where($"rn" <= n).drop("rn")
+ df.withColumn("rn", row_number.over(w)).where(col("rn") <= n).drop("rn")
}
/**
- * Used get the 'latest' record (after ordering according to the provided order columns) in each group.
+ * Used to get the 'latest' record (after ordering according to the provided order columns) in each group.
+ * the same functionality as {@link #dedup} but implemented using UDAF to utilize map side aggregation.
+ *
* @param df DataFrame to operate on
* @param groupCol column to group by the records
* @param orderByCol column to order the records according to
@@ -96,11 +98,25 @@ object SparkDFUtils extends SparkDFUtilsTrait {
/**
* Returns a DataFrame with the given column (should be a StructType) replaced by its inner fields.
* This method only flattens a single level of nesting.
+ *
+ * +-------+----------+----------+----------+
+ * |id |s.sub_col1|s.sub_col2|s.sub_col3|
+ * +-------+----------+----------+----------+
+ * |123 |1 |2 |3 |
+ * +-------+----------+----------+----------+
+ *
+ * +-------+----------+----------+----------+
+ * |id |sub_col1 |sub_col2 |sub_col3 |
+ * +-------+----------+----------+----------+
+ * |123 |1 |2 |3 |
+ * +-------+----------+----------+----------+
+ *
* @param df DataFrame to operate on
* @param colName column name for a column of type StructType
* @return DataFrame representing the data after the operation
*/
override def flatten(df: DataFrame, colName: String): DataFrame = {
+ assert(df.schema(colName).dataType.isInstanceOf[StructType], s"Column $colName must be of type Struct")
val outerFields = df.schema.fields.map(_.name).toSet
val flattenFields = df.schema(colName).dataType.asInstanceOf[StructType].fields.filter(f => !outerFields.contains(f.name)).map("`" + colName + "`.`" + _.name + "`")
df.selectExpr("*" +: flattenFields: _*).drop(colName)
@@ -116,18 +132,19 @@ object SparkDFUtils extends SparkDFUtilsTrait {
df.select(df.columns.zip(newScheme).map {case (oldCol: String, newCol: String) => col(oldCol).as(newCol)}: _*)
/**
- * Used to perform a join when the right df is relatively small but doesn't fit to perform broadcast join.
+ * Used to perform a join when the right df is relatively small but still too big to fit in memory to perform map side broadcast join.
* Use cases:
- * a. excluding keys that might be skew from a medium size list.
- * b. join a big skewed table with a table that has small number of very big rows.
+ * a. excluding keys that might be skewed from a medium size list.
+ * b. join a big skewed table with a table that has small number of very large rows.
+ *
* @param dfLeft left DataFrame
* @param dfRight right DataFrame
* @param joinExprs join expression
- * @param numShards number of shards
+ * @param numShards number of shards - number of times to duplicate the right DataFrame
* @param joinType join type
- * @return DataFrame representing the data after the operation
+ * @return joined DataFrame
*/
- override def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int = 30, joinType: String = "inner"): DataFrame = {
+ override def joinSkewed(dfLeft: DataFrame, dfRight: DataFrame, joinExprs: Column, numShards: Int = 10, joinType: String = "inner"): DataFrame = {
// skew join based on salting
// salts the left DF by adding another random column and join with the right DF after duplicating it
val ss = dfLeft.sparkSession
@@ -144,13 +161,13 @@ object SparkDFUtils extends SparkDFUtilsTrait {
* @param notSkewed not skewed DataFrame
* @param skewed skewed DataFrame
* @param joinCol join column
- * @param numberCustsToBroadcast num of custs to broadcast
+ * @param numRowsToBroadcast num of rows to broadcast
* @return DataFrame representing the data after the operation
*/
- override def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numberCustsToBroadcast: Int): DataFrame = {
+ override def broadcastJoinSkewed(notSkewed: DataFrame, skewed: DataFrame, joinCol: String, numRowsToBroadcast: Int): DataFrame = {
val ss = notSkewed.sparkSession
import ss.implicits._
- val skewedKeys = skewed.groupBy(joinCol).count().sort($"count".desc).limit(numberCustsToBroadcast).drop("count")
+ val skewedKeys = skewed.groupBy(joinCol).count().sort($"count".desc).limit(numRowsToBroadcast).drop("count")
.withColumnRenamed(joinCol, "skew_join_key").cache()
val notSkewedWithSkewIndicator = notSkewed.join(broadcast(skewedKeys), $"skew_join_key" === col(joinCol), "left")
@@ -170,48 +187,47 @@ object SparkDFUtils extends SparkDFUtilsTrait {
}
/**
- * Helper function to join a table with column to a table with range of the same column.
- * For example, ip table with whois data that has range of ips as lines.
- * The main problem which this handles is doing naive explode on the range can result in huge table.
+ * Helper function to join a table with point column to a table with range column.
+ * For example, join a table that contains specific time in minutes with a table that contains time ranges.
+ * The main problem this function addresses is that doing naive explode on the ranges can result in a huge table.
* requires:
- * 1. single table needs to be distinct on the join column, because there could be a few corresponding ranges so we dedup at the end - we choose the minimal range.
- * 2. the range and single columns to be numeric.
+ * 1. point table needs to be distinct on the point column. there could be a few corresponding ranges to each point,
+ * so we choose the minimal range.
+ * 2. the range and point columns need to be numeric.
*
- * IP:
- * +-------+---------+
- * |ip |ip_val |
- * +-------+---------+
- * |123 |0.0.0.123|
- * +-------+---------+
+ * TIMES:
+ * +-------+
+ * |time |
+ * +-------+
+ * |11:55 |
+ * +-------+
*
- * IP RANGES:
- * +---------+---------+----------+
- * |ip_start |ip_end |desc |
- * +---------+---------+----------+
- * |100 |200 | nba1.com |
- * +---------+---------+----------+
- * |50 |300 | nba2.com |
- * +---------+---------+----------+
+ * TIME RANGES:
+ * +----------+---------+----------+
+ * |start_time|end_time |desc |
+ * +----------+---------+----------+
+ * |10:00 |12:00 | meeting |
+ * +----------+---------+----------+
+ * |11:50 |12:15 | lunch |
+ * +----------+---------+----------+
*
* OUTPUT:
- * +---------+---------+---------+---------+----------+
- * |ip |ip_val |ip_start |ip_end |desc |
- * +---------+---------+---------+---------+----------+
- * |123 |0.0.0.123|100 |200 | nba1.com |
- * +---------+---------+---------+---------+----------+
- *
+ * +-------+----------+---------+-------+
+ * |time |start_time|end_time |desc |
+ * +-------+----------+---------+-------+
+ * |11:55 |11:50 |12:15 | lunch |
+ * +-------+----------+---------+-------+
*
- * @param dfSingle
- * @param colSingle
- * @param dfRange
- * @param colRangeStart
- * @param colRangeEnd
- * @param DECREASE_FACTOR
+ * @param dfSingle - DataFrame that contains the point column
+ * @param colSingle - the point column's name
+ * @param dfRange - DataFrame that contains the range column
+ * @param colRangeStart - the start range column's name
+ * @param colRangeEnd - the end range column's name
+ * @param DECREASE_FACTOR - resolution factor. instead of exploding the range column directly, we first decrease its resolution by this factor
* @return
*/
override def joinWithRange(dfSingle: DataFrame, colSingle: String, dfRange: DataFrame, colRangeStart: String, colRangeEnd: String, DECREASE_FACTOR: Long): DataFrame = {
- //sqlContext.udf.register("range", (start: Int, end: Int) => (start to end).toArray)
import org.apache.spark.sql.functions.udf
val rangeUDF = udf((start: Long, end: Long) => (start to end).toArray)
val dfRange_exploded = dfRange.withColumn("range_start", col(colRangeStart).cast(LongType))
http://git-wip-us.apache.org/repos/asf/datafu/blob/f6d40474/datafu-spark/src/main/scala/datafu/spark/SparkUDAFs.scala
----------------------------------------------------------------------
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkUDAFs.scala b/datafu-spark/src/main/scala/datafu/spark/SparkUDAFs.scala
index 3c4782f..fe95ffb 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkUDAFs.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkUDAFs.scala
@@ -28,6 +28,10 @@ import scala.collection.{Map, mutable}
object SparkUDAFs {
+ /**
+ * Like Google's MultiSets.
+ * Aggregate function that creates a map of key to its count.
+ */
class MultiSet() extends UserDefinedAggregateFunction {
def inputSchema: StructType = new StructType().add("key", StringType)
@@ -67,6 +71,10 @@ object SparkUDAFs {
}
+ /**
+ * Essentially the same as MultiSet, but gets an Array for input.
+ * There is an extra option to limit the number of keys (like CountDistinctUpTo)
+ */
class MultiArraySet[T : Ordering](dt : DataType = StringType, maxKeys: Int = -1) extends UserDefinedAggregateFunction {
def inputSchema: StructType = new StructType().add("key", ArrayType(dt))
@@ -129,7 +137,9 @@ object SparkUDAFs {
}
- // Merge maps of kind string -> set<string>
+ /**
+ * Merge maps of kind string -> set<string>
+ */
class MapSetMerge extends UserDefinedAggregateFunction {
def inputSchema: StructType = new StructType().add("key", dataType)