You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2022/10/06 06:33:44 UTC

[datafu] branch master updated: DATAFU-163 exposing explodeArray method in python

This is an automated email from the ASF dual-hosted git repository.

eyal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/datafu.git


The following commit(s) were added to refs/heads/master by this push:
     new d1798ca  DATAFU-163 exposing explodeArray method in python
d1798ca is described below

commit d1798cae64f108d178589409a1ef74feeb55a32f
Author: arbhard2 <ar...@cisco.com>
AuthorDate: Mon Oct 3 21:31:19 2022 +0530

    DATAFU-163 exposing explodeArray method in python
    
    Signed-off-by: Eyal Allweil <ey...@apache.org>
---
 datafu-spark/src/main/resources/pyspark_utils/df_utils.py    | 12 ++++++++++++
 datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala  |  4 ++++
 .../src/test/resources/python_tests/df_utils_tests.py        |  7 +++++++
 .../src/test/scala/datafu/spark/TestScalaPythonBridge.scala  |  2 ++
 4 files changed, 25 insertions(+)

diff --git a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
index a14ded1..31b989d 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/df_utils.py
@@ -141,6 +141,17 @@ def join_with_range_and_dedup(df_single, col_single, df_range, col_range_start,
     jdf = _get_utils(df_single).joinWithRangeAndDedup(df_single._jdf, col_single, df_range._jdf, col_range_start, col_range_end, decrease_factor, dedup_small_range)
     return DataFrame(jdf, df_single.sql_ctx)
 
+def explode_array(df, array_col, alias):
+    """
+    Given an array column that you need to explode into different columns, use this method.
+    This function counts the number of output columns by executing the Spark job internally on the input array column.
+    Consider caching the input dataframe if this is an expensive operation.
+    :param df: DataFrame to operate on
+    :param array_col: Array Column
+    :param alias: Alias for new columns after explode
+    """
+    jdf = _get_utils(df).explodeArray(df._jdf, array_col._jc, alias)
+    return DataFrame(jdf, df.sql_ctx)
 
 def _cols_to_java_cols(cols):
     return _map_if_needed(lambda x: x._jc, cols)
@@ -169,4 +180,5 @@ def activate():
     pyspark.sql.DataFrame.broadcast_join_skewed = broadcast_join_skewed
     pyspark.sql.DataFrame.join_with_range = join_with_range
     pyspark.sql.DataFrame.join_with_range_and_dedup = join_with_range_and_dedup
+    pyspark.sql.DataFrame.explode_array = explode_array
 
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
index 4fd068f..81bdb45 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
@@ -129,6 +129,10 @@ class SparkDFUtilsBridge {
     )
   }
 
+  def explodeArray(df: DataFrame, arrayCol: Column, alias: String): DataFrame = {
+    SparkDFUtils.explodeArray(df, arrayCol, alias)
+  }
+
   def dedupRandomN(df: DataFrame, groupCol: Column, maxSize: Int): DataFrame = {
     SparkDFUtils.dedupRandomN(df, groupCol, maxSize)
   }
diff --git a/datafu-spark/src/test/resources/python_tests/df_utils_tests.py b/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
index 6e6447f..1e58176 100644
--- a/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
+++ b/datafu-spark/src/test/resources/python_tests/df_utils_tests.py
@@ -88,3 +88,10 @@ func_joinWithRangeAndDedup_res = df_utils.join_with_range_and_dedup(df_single=df
                                                                     col_range_start="start", col_range_end="end",
                                                                     decrease_factor=5, dedup_small_range=True)
 func_joinWithRangeAndDedup_res.registerTempTable("joinWithRangeAndDedup")
+
+dfArray = sqlContext.createDataFrame([
+    (0.0, ["Hi", "I heard", "about", "Spark"])],
+    ["label", "sentence_arr"])
+
+func_explodeArray_res = df_utils.explode_array(df=dfArray, array_col=dfArray.sentence_arr, alias="token")
+func_explodeArray_res.registerTempTable("explodeArray")
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala b/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
index 9cd0ad6..b67386a 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestScalaPythonBridge.scala
@@ -121,6 +121,8 @@ class TestScalaPythonBridge extends FunSuite {
                 "[a,Laura,34,a,34,36], [b,Margaret,36,a,34,36]")
     assertTable("joinWithRangeAndDedup",
                 "[a,Laura,34,a,34,36], [b,Margaret,36,a,34,36]")
+    assertTable("explodeArray",
+      "[0.0,WrappedArray(Hi, I heard, about, Spark),Hi,I heard,about,Spark]")
   }
 
 }