You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2019/07/16 13:31:09 UTC
[datafu] branch spark-tmp updated: Rename dedup methods
This is an automated email from the ASF dual-hosted git repository.
eyal pushed a commit to branch spark-tmp
in repository https://gitbox.apache.org/repos/asf/datafu.git
The following commit(s) were added to refs/heads/spark-tmp by this push:
new 37d4505 Rename dedup methods
37d4505 is described below
commit 37d4505b70c7939dffd04e42a3db26d0fee5a196
Author: Ohad Raviv <or...@paypal.com>
AuthorDate: Tue Jul 16 15:44:37 2019 +0300
Rename dedup methods
Signed-off-by: Eyal Allweil <ey...@apache.org>
---
datafu-spark/build_and_test_spark.sh | 56 +++++++++++-----------
.../main/resources/pyspark_utils/bridge_utils.py | 2 +-
.../src/main/resources/pyspark_utils/df_utils.py | 12 ++---
.../src/main/scala/datafu/spark/DataFrameOps.scala | 18 +++----
.../src/main/scala/datafu/spark/SparkDFUtils.scala | 31 ++++++------
.../test/resources/python_tests/df_utils_tests.py | 8 ++--
.../scala/datafu/spark/TestScalaPythonBridge.scala | 4 +-
.../test/scala/datafu/spark/TestSparkDFUtils.scala | 10 ++--
8 files changed, 71 insertions(+), 70 deletions(-)
diff --git a/datafu-spark/build_and_test_spark.sh b/datafu-spark/build_and_test_spark.sh
index 0b876eb..ff8cdcd 100755
--- a/datafu-spark/build_and_test_spark.sh
+++ b/datafu-spark/build_and_test_spark.sh
@@ -28,32 +28,32 @@ export LATEST_SPARK_VERSIONS_FOR_SCALA_212="2.4.3"
STARTTIME=$(date +%s)
function log {
- echo $1
- if [[ $LOG_FILE != "NONE" ]]; then
- echo $1 >> $LOG_FILE
- fi
+ echo $1
+ if [[ $LOG_FILE != "NONE" ]]; then
+ echo $1 >> $LOG_FILE
+ fi
}
function build {
- echo "----- Building versions for Scala $scala, Spark $spark ----"
- if ./gradlew :datafu-spark:clean; then
- echo "----- Clean for Scala $scala, spark $spark succeeded"
- if ./gradlew :datafu-spark:assemble -PscalaVersion=$scala -PsparkVersion=$spark; then
- echo "----- Build for Scala $scala, spark $spark succeeded"
- if ./gradlew :datafu-spark:test -PscalaVersion=$scala -PsparkVersion=$spark $TEST_PARAMS; then
- log "Testing for Scala $scala, spark $spark succeeded"
- if [[ $JARS_DIR != "NONE" ]]; then
- cp datafu-spark/build/libs/*.jar $JARS_DIR/
- fi
- else
- log "Testing for Scala $scala, spark $spark failed (build succeeded)"
- fi
- else
- log "Build for Scala $scala, spark $spark failed"
- fi
- else
- log "Clean for Scala $scala, Spark $spark failed"
- fi
+ echo "----- Building versions for Scala $scala, Spark $spark ----"
+ if ./gradlew :datafu-spark:clean; then
+ echo "----- Clean for Scala $scala, spark $spark succeeded"
+ if ./gradlew :datafu-spark:assemble -PscalaVersion=$scala -PsparkVersion=$spark; then
+ echo "----- Build for Scala $scala, spark $spark succeeded"
+ if ./gradlew :datafu-spark:test -PscalaVersion=$scala -PsparkVersion=$spark $TEST_PARAMS; then
+ log "Testing for Scala $scala, spark $spark succeeded"
+ if [[ $JARS_DIR != "NONE" ]]; then
+ cp datafu-spark/build/libs/*.jar $JARS_DIR/
+ fi
+ else
+ log "Testing for Scala $scala, spark $spark failed (build succeeded)"
+ fi
+ else
+ log "Build for Scala $scala, spark $spark failed"
+ fi
+ else
+ log "Clean for Scala $scala, Spark $spark failed"
+ fi
}
# -------------------------------------
@@ -91,27 +91,27 @@ while getopts "l:j:t:hq" arg; do
done
if [[ $LOG_FILE != "NONE" ]]; then
- echo "Building datafu-spark: $TEST_PARAMS" > $LOG_FILE
+ echo "Building datafu-spark: $TEST_PARAMS" > $LOG_FILE
fi
if [[ $JARS_DIR != "NONE" ]]; then
- echo "Copying successfully built and tested jars to $JARS_DIR" > $LOG_FILE
+ echo "Copying successfully built and tested jars to $JARS_DIR" > $LOG_FILE
mkdir $JARS_DIR
fi
export scala=2.10
for spark in $SPARK_VERSIONS_FOR_SCALA_210; do
- build
+ build
done
export scala=2.11
for spark in $SPARK_VERSIONS_FOR_SCALA_211; do
- build
+ build
done
export scala=2.12
for spark in $SPARK_VERSIONS_FOR_SCALA_212; do
- build
+ build
done
export ENDTIME=$(date +%s)
diff --git a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
index 119e637..40134d0 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
@@ -34,7 +34,7 @@ class Context(object):
"""When running a Python script from Scala - this function is called
by the script to initialize the connection to the Java Gateway and get the spark context.
code is basically copied from:
- https://github.com/apache/zeppelin/blob/master/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py#L30
+ https://github.com/apache/zeppelin/blob/master/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py#L30
"""
if os.environ.get("SPARK_EXECUTOR_URI"):
diff --git a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
index d91228e..16553ce 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
@@ -28,7 +28,7 @@ def _get_utils(df):
# public:
-def dedup(df, group_col, order_cols = []):
+def dedup_with_order(df, group_col, order_cols = []):
"""
Used get the 'latest' record (after ordering according to the provided order columns) in each group.
:param df: DataFrame to operate on
@@ -37,7 +37,7 @@ def dedup(df, group_col, order_cols = []):
:return: DataFrame representing the data after the operation
"""
java_cols = _cols_to_java_cols(order_cols)
- jdf = _get_utils(df).dedup(df._jdf, group_col._jc, java_cols)
+ jdf = _get_utils(df).dedupWithOrder(df._jdf, group_col._jc, java_cols)
return DataFrame(jdf, df.sql_ctx)
@@ -55,7 +55,7 @@ def dedup_top_n(df, n, group_col, order_cols = []):
return DataFrame(jdf, df.sql_ctx)
-def dedup2(df, group_col, order_by_col, desc = True, columns_filter = [], columns_filter_keep = True):
+def dedup_with_combiner(df, group_col, order_by_col, desc = True, columns_filter = [], columns_filter_keep = True):
"""
Used get the 'latest' record (after ordering according to the provided order columns) in each group.
:param df: DataFrame to operate on
@@ -67,7 +67,7 @@ def dedup2(df, group_col, order_by_col, desc = True, columns_filter = [], column
* those columns in the result
:return: DataFrame representing the data after the operation
"""
- jdf = _get_utils(df).dedup2(df._jdf, group_col._jc, order_by_col._jc, desc, columns_filter, columns_filter_keep)
+ jdf = _get_utils(df).dedupWithCombiner(df._jdf, group_col._jc, order_by_col._jc, desc, columns_filter, columns_filter_keep)
return DataFrame(jdf, df.sql_ctx)
@@ -160,9 +160,9 @@ def activate():
This technique taken from pymongo_spark
https://github.com/mongodb/mongo-hadoop/blob/master/spark/src/main/python/pymongo_spark.py
"""
- pyspark.sql.DataFrame.dedup = dedup
+ pyspark.sql.DataFrame.dedup_with_order = dedup_with_order
pyspark.sql.DataFrame.dedup_top_n = dedup_top_n
- pyspark.sql.DataFrame.dedup2 = dedup2
+ pyspark.sql.DataFrame.dedup_with_combiner = dedup_with_combiner
pyspark.sql.DataFrame.change_schema = change_schema
pyspark.sql.DataFrame.join_skewed = join_skewed
pyspark.sql.DataFrame.broadcast_join_skewed = broadcast_join_skewed
diff --git a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
index 8f74aa2..be4600d 100644
--- a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
@@ -34,19 +34,19 @@ object DataFrameOps {
implicit class someDataFrameUtils(df: DataFrame) {
- def dedup(groupCol: Column, orderCols: Column*): DataFrame =
- SparkDFUtils.dedup(df, groupCol, orderCols: _*)
+ def dedupWithOrder(groupCol: Column, orderCols: Column*): DataFrame =
+ SparkDFUtils.dedupWithOrder(df, groupCol, orderCols: _*)
def dedupTopN(n: Int, groupCol: Column, orderCols: Column*): DataFrame =
SparkDFUtils.dedupTopN(df, n, groupCol, orderCols: _*)
- def dedup2(groupCol: Column,
- orderByCol: Column,
- desc: Boolean = true,
- moreAggFunctions: Seq[Column] = Nil,
- columnsFilter: Seq[String] = Nil,
- columnsFilterKeep: Boolean = true): DataFrame =
- SparkDFUtils.dedup2(df,
+ def dedupWithCombiner(groupCol: Column,
+ orderByCol: Column,
+ desc: Boolean = true,
+ moreAggFunctions: Seq[Column] = Nil,
+ columnsFilter: Seq[String] = Nil,
+ columnsFilterKeep: Boolean = true): DataFrame =
+ SparkDFUtils.dedupWithCombiner(df,
groupCol,
orderByCol,
desc,
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
index 2b48c52..0ee1520 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
@@ -31,11 +31,11 @@ import org.apache.spark.storage.StorageLevel
*/
class SparkDFUtilsBridge {
- def dedup(df: DataFrame,
+ def dedupWithOrder(df: DataFrame,
groupCol: Column,
orderCols: JavaList[Column]): DataFrame = {
val converted = convertJavaListToSeq(orderCols)
- SparkDFUtils.dedup(df = df, groupCol = groupCol, orderCols = converted: _*)
+ SparkDFUtils.dedupWithOrder(df = df, groupCol = groupCol, orderCols = converted: _*)
}
def dedupTopN(df: DataFrame,
@@ -49,14 +49,14 @@ class SparkDFUtilsBridge {
orderCols = converted: _*)
}
- def dedup2(df: DataFrame,
+ def dedupWithCombiner(df: DataFrame,
groupCol: Column,
orderByCol: Column,
desc: Boolean,
columnsFilter: JavaList[String],
columnsFilterKeep: Boolean): DataFrame = {
val columnsFilter_converted = convertJavaListToSeq(columnsFilter)
- SparkDFUtils.dedup2(
+ SparkDFUtils.dedupWithCombiner(
df = df,
groupCol = groupCol,
orderByCol = orderByCol,
@@ -146,8 +146,7 @@ object SparkDFUtils {
* @param orderCols columns to order the records according to
* @return DataFrame representing the data after the operation
*/
- def dedup(df: DataFrame, groupCol: Column, orderCols: Column*): DataFrame = {
- df.dropDuplicates()
+ def dedupWithOrder(df: DataFrame, groupCol: Column, orderCols: Column*): DataFrame = {
dedupTopN(df, 1, groupCol, orderCols: _*)
}
@@ -174,6 +173,8 @@ object SparkDFUtils {
* in each group.
* the same functionality as {@link #dedup} but implemented using UDAF to utilize
* map side aggregation.
+ * this function should be used in cases when you expect a large number of rows to get combined,
+ * as they share the same group column.
*
* @param df DataFrame to operate on
* @param groupCol column to group by the records
@@ -185,13 +186,13 @@ object SparkDFUtils {
* or alternatively have only those columns in the result
* @return DataFrame representing the data after the operation
*/
- def dedup2(df: DataFrame,
- groupCol: Column,
- orderByCol: Column,
- desc: Boolean = true,
- moreAggFunctions: Seq[Column] = Nil,
- columnsFilter: Seq[String] = Nil,
- columnsFilterKeep: Boolean = true): DataFrame = {
+ def dedupWithCombiner(df: DataFrame,
+ groupCol: Column,
+ orderByCol: Column,
+ desc: Boolean = true,
+ moreAggFunctions: Seq[Column] = Nil,
+ columnsFilter: Seq[String] = Nil,
+ columnsFilterKeep: Boolean = true): DataFrame = {
val newDF =
if (columnsFilter == Nil) {
df.withColumn("sort_by_column", orderByCol)
@@ -488,12 +489,12 @@ object SparkDFUtils {
// "range_start" is here for consistency
val dfDeduped = if (dedupSmallRange) {
- dedup2(dfJoined,
+ dedupWithCombiner(dfJoined,
col(colSingle),
struct("range_size", "range_start"),
desc = false)
} else {
- dedup2(dfJoined,
+ dedupWithCombiner(dfJoined,
col(colSingle),
struct(expr("-range_size"), col("range_start")),
desc = true)
diff --git a/datafu-spark/src/test/resources/python_tests/df_utils_tests.py b/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
index d47d4a1..bf5d068 100644
--- a/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
+++ b/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
@@ -41,17 +41,17 @@ df_people = sqlContext.createDataFrame([
("c", "Zoey", 36)],
["id", "name", "age"])
-func_dedup_res = df_people.dedup(group_col=df_people.id,
+func_dedup_res = df_people.dedup_with_order(group_col=df_people.id,
order_cols=[df_people.age.desc(), df_people.name.desc()])
-func_dedup_res.registerTempTable("dedup")
+func_dedup_res.registerTempTable("dedup_with_order")
func_dedupTopN_res = df_people.dedup_top_n(n=2, group_col=df_people.id,
order_cols=[df_people.age.desc(), df_people.name.desc()])
func_dedupTopN_res.registerTempTable("dedupTopN")
-func_dedup2_res = df_people.dedup2(group_col=df_people.id, order_by_col=df_people.age, desc=True,
+func_dedup2_res = df_people.dedup_with_combiner(group_col=df_people.id, order_by_col=df_people.age, desc=True,
columns_filter=["name"], columns_filter_keep=False)
-func_dedup2_res.registerTempTable("dedup2")
+func_dedup2_res.registerTempTable("dedup_with_combiner")
func_changeSchema_res = df_people.change_schema(new_scheme=["id1", "name1", "age1"])
func_changeSchema_res.registerTempTable("changeSchema")
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala b/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
index 2781561..aa6ed52 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
@@ -107,11 +107,11 @@ class TestScalaPythonBridge extends FunSuite {
test("SparkDFUtilsBridge") {
runner.runPythonFile("python_tests/df_utils_tests.py")
- assertTable("dedup", "[a,Alice,34], [b,Bob,36], [c,Zoey,36]")
+ assertTable("dedup_with_order", "[a,Alice,34], [b,Bob,36], [c,Zoey,36]")
assertTable(
"dedupTopN",
"[a,Alice,34], [a,Sara,33], [b,Bob,36], [b,Charlie,30], [c,Fanny,36], [c,Zoey,36]")
- assertTable("dedup2", "[a,34], [b,36], [c,36]")
+ assertTable("dedup_with_combiner", "[a,34], [b,36], [c,36]")
assertTable(
"changeSchema",
"[a,Alice,34], [a,Sara,33], [b,Bob,36], [b,Charlie,30], [c,David,29], [c,Esther,32], " +
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
index 045b75b..b148c35 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
@@ -63,7 +63,7 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
assertDataFrameEquals(expected,
inputDataFrame
- .dedup($"col_grp", $"col_ord".desc)
+ .dedupWithOrder($"col_grp", $"col_ord".desc)
.select($"col_grp", $"col_ord"))
}
@@ -78,7 +78,7 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
List(dedupExp("asd4", "b", Option(1), "asd4"),
dedupExp("asd1", "a", Option(3), "asd3")))
- val actual = inputDataFrame.dedup2($"col_grp",
+ val actual = inputDataFrame.dedupWithCombiner($"col_grp",
$"col_ord",
moreAggFunctions = Seq(min($"col_str")))
@@ -89,7 +89,7 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
test("dedup2_by_string_asc") {
- val actual = inputDataFrame.dedup2($"col_grp", $"col_str", desc = false)
+ val actual = inputDataFrame.dedupWithCombiner($"col_grp", $"col_str", desc = false)
val expectedByStringDf: DataFrame = sqlContext.createDataFrame(
List(dedupExp2("b", Option(1), "asd4"),
@@ -100,7 +100,7 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
test("test_dedup2_by_complex_column") {
- val actual = inputDataFrame.dedup2($"col_grp",
+ val actual = inputDataFrame.dedupWithCombiner($"col_grp",
expr("cast(concat('-',col_ord) as int)"),
desc = false)
@@ -129,7 +129,7 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
.withColumn("struct_col", expr("struct(col_grp, col_ord)"))
.withColumn("map_col", expr("map(col_grp, col_ord)"))
.withColumn("map_col_blah", expr("map(col_grp, col_ord)"))
- .dedup2($"col_grp", expr("cast(concat('-',col_ord) as int)"))
+ .dedupWithCombiner($"col_grp", expr("cast(concat('-',col_ord) as int)"))
.drop("map_col_blah")
val expected: DataFrame = sqlContext.createDataFrame(