You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2019/07/16 13:31:09 UTC

[datafu] branch spark-tmp updated: Rename dedup methods

This is an automated email from the ASF dual-hosted git repository.

eyal pushed a commit to branch spark-tmp
in repository https://gitbox.apache.org/repos/asf/datafu.git


The following commit(s) were added to refs/heads/spark-tmp by this push:
     new 37d4505  Rename dedup methods
37d4505 is described below

commit 37d4505b70c7939dffd04e42a3db26d0fee5a196
Author: Ohad Raviv <or...@paypal.com>
AuthorDate: Tue Jul 16 15:44:37 2019 +0300

    Rename dedup methods
    
    Signed-off-by: Eyal Allweil <ey...@apache.org>
---
 datafu-spark/build_and_test_spark.sh               | 56 +++++++++++-----------
 .../main/resources/pyspark_utils/bridge_utils.py   |  2 +-
 .../src/main/resources/pyspark_utils/df_utils.py   | 12 ++---
 .../src/main/scala/datafu/spark/DataFrameOps.scala | 18 +++----
 .../src/main/scala/datafu/spark/SparkDFUtils.scala | 31 ++++++------
 .../test/resources/python_tests/df_utils_tests.py  |  8 ++--
 .../scala/datafu/spark/TestScalaPythonBridge.scala |  4 +-
 .../test/scala/datafu/spark/TestSparkDFUtils.scala | 10 ++--
 8 files changed, 71 insertions(+), 70 deletions(-)

diff --git a/datafu-spark/build_and_test_spark.sh b/datafu-spark/build_and_test_spark.sh
index 0b876eb..ff8cdcd 100755
--- a/datafu-spark/build_and_test_spark.sh
+++ b/datafu-spark/build_and_test_spark.sh
@@ -28,32 +28,32 @@ export LATEST_SPARK_VERSIONS_FOR_SCALA_212="2.4.3"
 STARTTIME=$(date +%s)
 
 function log {
-	echo $1
-	if [[ $LOG_FILE != "NONE" ]]; then
-		echo $1 >> $LOG_FILE
-	fi
+  echo $1
+  if [[ $LOG_FILE != "NONE" ]]; then
+    echo $1 >> $LOG_FILE
+  fi
 }
 
 function build {
-	echo "----- Building versions for Scala $scala, Spark $spark ----"
-	if ./gradlew :datafu-spark:clean; then
-		echo "----- Clean for Scala $scala, spark $spark succeeded"
-		if ./gradlew :datafu-spark:assemble -PscalaVersion=$scala -PsparkVersion=$spark; then
-			echo "----- Build for Scala $scala, spark $spark succeeded"
-			if ./gradlew :datafu-spark:test -PscalaVersion=$scala -PsparkVersion=$spark $TEST_PARAMS; then
-				log "Testing for Scala $scala, spark $spark succeeded"
-				if [[ $JARS_DIR != "NONE" ]]; then
-					cp datafu-spark/build/libs/*.jar $JARS_DIR/
-				fi
-			else
-				log "Testing for Scala $scala, spark $spark failed (build succeeded)"
-			fi
-		else
-			log "Build for Scala $scala, spark $spark failed"
-		fi
-	else
-		log "Clean for Scala $scala, Spark $spark failed"
-	fi
+  echo "----- Building versions for Scala $scala, Spark $spark ----"
+  if ./gradlew :datafu-spark:clean; then
+    echo "----- Clean for Scala $scala, spark $spark succeeded"
+    if ./gradlew :datafu-spark:assemble -PscalaVersion=$scala -PsparkVersion=$spark; then
+      echo "----- Build for Scala $scala, spark $spark succeeded"
+      if ./gradlew :datafu-spark:test -PscalaVersion=$scala -PsparkVersion=$spark $TEST_PARAMS; then
+        log "Testing for Scala $scala, spark $spark succeeded"
+        if [[ $JARS_DIR != "NONE" ]]; then
+          cp datafu-spark/build/libs/*.jar $JARS_DIR/
+        fi
+      else
+        log "Testing for Scala $scala, spark $spark failed (build succeeded)"
+      fi
+    else
+      log "Build for Scala $scala, spark $spark failed"
+    fi
+  else
+    log "Clean for Scala $scala, Spark $spark failed"
+  fi
 }
 
 # -------------------------------------
@@ -91,27 +91,27 @@ while getopts "l:j:t:hq" arg; do
 done
 
 if [[ $LOG_FILE != "NONE" ]]; then
-	echo "Building datafu-spark: $TEST_PARAMS" > $LOG_FILE
+  echo "Building datafu-spark: $TEST_PARAMS" > $LOG_FILE
 fi
 
 if [[ $JARS_DIR != "NONE" ]]; then
-	echo "Copying successfully built and tested jars to $JARS_DIR" > $LOG_FILE
+  echo "Copying successfully built and tested jars to $JARS_DIR" > $LOG_FILE
   mkdir $JARS_DIR
 fi
 
 export scala=2.10
 for spark in $SPARK_VERSIONS_FOR_SCALA_210; do
-	build
+  build
 done
 
 export scala=2.11
 for spark in $SPARK_VERSIONS_FOR_SCALA_211; do
-	build
+  build
 done
 
 export scala=2.12
 for spark in $SPARK_VERSIONS_FOR_SCALA_212; do
-	build
+  build
 done
 
 export ENDTIME=$(date +%s)
diff --git a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
index 119e637..40134d0 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
@@ -34,7 +34,7 @@ class Context(object):
         """When running a Python script from Scala - this function is called
         by the script to initialize the connection to the Java Gateway and get the spark context.
         code is basically copied from: 
-         https://github.com/apache/zeppelin/blob/master/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py#L30
+        https://github.com/apache/zeppelin/blob/master/spark/interpreter/src/main/resources/python/zeppelin_pyspark.py#L30
         """
 
         if os.environ.get("SPARK_EXECUTOR_URI"):
diff --git a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
index d91228e..16553ce 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
@@ -28,7 +28,7 @@ def _get_utils(df):
 # public:
 
 
-def dedup(df, group_col, order_cols = []):
+def dedup_with_order(df, group_col, order_cols = []):
     """
     Used get the 'latest' record (after ordering according to the provided order columns) in each group.
     :param df: DataFrame to operate on
@@ -37,7 +37,7 @@ def dedup(df, group_col, order_cols = []):
     :return: DataFrame representing the data after the operation
     """
     java_cols = _cols_to_java_cols(order_cols)
-    jdf = _get_utils(df).dedup(df._jdf, group_col._jc, java_cols)
+    jdf = _get_utils(df).dedupWithOrder(df._jdf, group_col._jc, java_cols)
     return DataFrame(jdf, df.sql_ctx)
 
 
@@ -55,7 +55,7 @@ def dedup_top_n(df, n, group_col, order_cols = []):
     return DataFrame(jdf, df.sql_ctx)
 
 
-def dedup2(df, group_col, order_by_col, desc = True, columns_filter = [], columns_filter_keep = True):
+def dedup_with_combiner(df, group_col, order_by_col, desc = True, columns_filter = [], columns_filter_keep = True):
     """
     Used get the 'latest' record (after ordering according to the provided order columns) in each group.
     :param df: DataFrame to operate on
@@ -67,7 +67,7 @@ def dedup2(df, group_col, order_by_col, desc = True, columns_filter = [], column
 *                          those columns in the result
     :return: DataFrame representing the data after the operation
     """
-    jdf = _get_utils(df).dedup2(df._jdf, group_col._jc, order_by_col._jc, desc, columns_filter, columns_filter_keep)
+    jdf = _get_utils(df).dedupWithCombiner(df._jdf, group_col._jc, order_by_col._jc, desc, columns_filter, columns_filter_keep)
     return DataFrame(jdf, df.sql_ctx)
 
 
@@ -160,9 +160,9 @@ def activate():
     This technique taken from pymongo_spark
     https://github.com/mongodb/mongo-hadoop/blob/master/spark/src/main/python/pymongo_spark.py
     """
-    pyspark.sql.DataFrame.dedup = dedup
+    pyspark.sql.DataFrame.dedup_with_order = dedup_with_order
     pyspark.sql.DataFrame.dedup_top_n = dedup_top_n
-    pyspark.sql.DataFrame.dedup2 = dedup2
+    pyspark.sql.DataFrame.dedup_with_combiner = dedup_with_combiner
     pyspark.sql.DataFrame.change_schema = change_schema
     pyspark.sql.DataFrame.join_skewed = join_skewed
     pyspark.sql.DataFrame.broadcast_join_skewed = broadcast_join_skewed
diff --git a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
index 8f74aa2..be4600d 100644
--- a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
@@ -34,19 +34,19 @@ object DataFrameOps {
 
   implicit class someDataFrameUtils(df: DataFrame) {
 
-    def dedup(groupCol: Column, orderCols: Column*): DataFrame =
-      SparkDFUtils.dedup(df, groupCol, orderCols: _*)
+    def dedupWithOrder(groupCol: Column, orderCols: Column*): DataFrame =
+      SparkDFUtils.dedupWithOrder(df, groupCol, orderCols: _*)
 
     def dedupTopN(n: Int, groupCol: Column, orderCols: Column*): DataFrame =
       SparkDFUtils.dedupTopN(df, n, groupCol, orderCols: _*)
 
-    def dedup2(groupCol: Column,
-               orderByCol: Column,
-               desc: Boolean = true,
-               moreAggFunctions: Seq[Column] = Nil,
-               columnsFilter: Seq[String] = Nil,
-               columnsFilterKeep: Boolean = true): DataFrame =
-      SparkDFUtils.dedup2(df,
+    def dedupWithCombiner(groupCol: Column,
+                          orderByCol: Column,
+                          desc: Boolean = true,
+                          moreAggFunctions: Seq[Column] = Nil,
+                          columnsFilter: Seq[String] = Nil,
+                          columnsFilterKeep: Boolean = true): DataFrame =
+      SparkDFUtils.dedupWithCombiner(df,
                           groupCol,
                           orderByCol,
                           desc,
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
index 2b48c52..0ee1520 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
@@ -31,11 +31,11 @@ import org.apache.spark.storage.StorageLevel
  */
 class SparkDFUtilsBridge {
 
-  def dedup(df: DataFrame,
+  def dedupWithOrder(df: DataFrame,
             groupCol: Column,
             orderCols: JavaList[Column]): DataFrame = {
     val converted = convertJavaListToSeq(orderCols)
-    SparkDFUtils.dedup(df = df, groupCol = groupCol, orderCols = converted: _*)
+    SparkDFUtils.dedupWithOrder(df = df, groupCol = groupCol, orderCols = converted: _*)
   }
 
   def dedupTopN(df: DataFrame,
@@ -49,14 +49,14 @@ class SparkDFUtilsBridge {
                            orderCols = converted: _*)
   }
 
-  def dedup2(df: DataFrame,
+  def dedupWithCombiner(df: DataFrame,
              groupCol: Column,
              orderByCol: Column,
              desc: Boolean,
              columnsFilter: JavaList[String],
              columnsFilterKeep: Boolean): DataFrame = {
     val columnsFilter_converted = convertJavaListToSeq(columnsFilter)
-    SparkDFUtils.dedup2(
+    SparkDFUtils.dedupWithCombiner(
       df = df,
       groupCol = groupCol,
       orderByCol = orderByCol,
@@ -146,8 +146,7 @@ object SparkDFUtils {
     * @param orderCols columns to order the records according to
     * @return DataFrame representing the data after the operation
     */
-  def dedup(df: DataFrame, groupCol: Column, orderCols: Column*): DataFrame = {
-    df.dropDuplicates()
+  def dedupWithOrder(df: DataFrame, groupCol: Column, orderCols: Column*): DataFrame = {
     dedupTopN(df, 1, groupCol, orderCols: _*)
   }
 
@@ -174,6 +173,8 @@ object SparkDFUtils {
     * in each group.
     * the same functionality as {@link #dedup} but implemented using UDAF to utilize
     * map side aggregation.
+    * this function should be used in cases when you expect a large number of rows to get combined,
+    * as they share the same group column.
     *
     * @param df DataFrame to operate on
     * @param groupCol column to group by the records
@@ -185,13 +186,13 @@ object SparkDFUtils {
     *                          or alternatively have only those columns in the result
     * @return DataFrame representing the data after the operation
     */
-  def dedup2(df: DataFrame,
-             groupCol: Column,
-             orderByCol: Column,
-             desc: Boolean = true,
-             moreAggFunctions: Seq[Column] = Nil,
-             columnsFilter: Seq[String] = Nil,
-             columnsFilterKeep: Boolean = true): DataFrame = {
+  def dedupWithCombiner(df: DataFrame,
+                        groupCol: Column,
+                        orderByCol: Column,
+                        desc: Boolean = true,
+                        moreAggFunctions: Seq[Column] = Nil,
+                        columnsFilter: Seq[String] = Nil,
+                        columnsFilterKeep: Boolean = true): DataFrame = {
     val newDF =
       if (columnsFilter == Nil) {
         df.withColumn("sort_by_column", orderByCol)
@@ -488,12 +489,12 @@ object SparkDFUtils {
 
     // "range_start" is here for consistency
     val dfDeduped = if (dedupSmallRange) {
-      dedup2(dfJoined,
+      dedupWithCombiner(dfJoined,
              col(colSingle),
              struct("range_size", "range_start"),
              desc = false)
     } else {
-      dedup2(dfJoined,
+      dedupWithCombiner(dfJoined,
              col(colSingle),
              struct(expr("-range_size"), col("range_start")),
              desc = true)
diff --git a/datafu-spark/src/test/resources/python_tests/df_utils_tests.py b/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
index d47d4a1..bf5d068 100644
--- a/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
+++ b/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
@@ -41,17 +41,17 @@ df_people = sqlContext.createDataFrame([
     ("c", "Zoey", 36)],
     ["id", "name", "age"])
 
-func_dedup_res = df_people.dedup(group_col=df_people.id,
+func_dedup_res = df_people.dedup_with_order(group_col=df_people.id,
                              order_cols=[df_people.age.desc(), df_people.name.desc()])
-func_dedup_res.registerTempTable("dedup")
+func_dedup_res.registerTempTable("dedup_with_order")
 
 func_dedupTopN_res = df_people.dedup_top_n(n=2, group_col=df_people.id,
                                      order_cols=[df_people.age.desc(), df_people.name.desc()])
 func_dedupTopN_res.registerTempTable("dedupTopN")
 
-func_dedup2_res = df_people.dedup2(group_col=df_people.id, order_by_col=df_people.age, desc=True,
+func_dedup2_res = df_people.dedup_with_combiner(group_col=df_people.id, order_by_col=df_people.age, desc=True,
                                columns_filter=["name"], columns_filter_keep=False)
-func_dedup2_res.registerTempTable("dedup2")
+func_dedup2_res.registerTempTable("dedup_with_combiner")
 
 func_changeSchema_res = df_people.change_schema(new_scheme=["id1", "name1", "age1"])
 func_changeSchema_res.registerTempTable("changeSchema")
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala b/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
index 2781561..aa6ed52 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
@@ -107,11 +107,11 @@ class TestScalaPythonBridge extends FunSuite {
 
   test("SparkDFUtilsBridge") {
     runner.runPythonFile("python_tests/df_utils_tests.py")
-    assertTable("dedup", "[a,Alice,34], [b,Bob,36], [c,Zoey,36]")
+    assertTable("dedup_with_order", "[a,Alice,34], [b,Bob,36], [c,Zoey,36]")
     assertTable(
       "dedupTopN",
       "[a,Alice,34], [a,Sara,33], [b,Bob,36], [b,Charlie,30], [c,Fanny,36], [c,Zoey,36]")
-    assertTable("dedup2", "[a,34], [b,36], [c,36]")
+    assertTable("dedup_with_combiner", "[a,34], [b,36], [c,36]")
     assertTable(
       "changeSchema",
       "[a,Alice,34], [a,Sara,33], [b,Bob,36], [b,Charlie,30], [c,David,29], [c,Esther,32], " +
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
index 045b75b..b148c35 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
@@ -63,7 +63,7 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
 
     assertDataFrameEquals(expected,
                           inputDataFrame
-                            .dedup($"col_grp", $"col_ord".desc)
+                            .dedupWithOrder($"col_grp", $"col_ord".desc)
                             .select($"col_grp", $"col_ord"))
   }
 
@@ -78,7 +78,7 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
       List(dedupExp("asd4", "b", Option(1), "asd4"),
         dedupExp("asd1", "a", Option(3), "asd3")))
 
-    val actual = inputDataFrame.dedup2($"col_grp",
+    val actual = inputDataFrame.dedupWithCombiner($"col_grp",
                                        $"col_ord",
                                        moreAggFunctions = Seq(min($"col_str")))
 
@@ -89,7 +89,7 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
 
   test("dedup2_by_string_asc") {
 
-    val actual = inputDataFrame.dedup2($"col_grp", $"col_str", desc = false)
+    val actual = inputDataFrame.dedupWithCombiner($"col_grp", $"col_str", desc = false)
 
     val expectedByStringDf: DataFrame = sqlContext.createDataFrame(
       List(dedupExp2("b", Option(1), "asd4"),
@@ -100,7 +100,7 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
 
   test("test_dedup2_by_complex_column") {
 
-    val actual = inputDataFrame.dedup2($"col_grp",
+    val actual = inputDataFrame.dedupWithCombiner($"col_grp",
                                        expr("cast(concat('-',col_ord) as int)"),
                                        desc = false)
 
@@ -129,7 +129,7 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
       .withColumn("struct_col", expr("struct(col_grp, col_ord)"))
       .withColumn("map_col", expr("map(col_grp, col_ord)"))
       .withColumn("map_col_blah", expr("map(col_grp, col_ord)"))
-      .dedup2($"col_grp", expr("cast(concat('-',col_ord) as int)"))
+      .dedupWithCombiner($"col_grp", expr("cast(concat('-',col_ord) as int)"))
       .drop("map_col_blah")
 
     val expected: DataFrame = sqlContext.createDataFrame(