You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2020/12/29 12:38:46 UTC

[datafu] branch master updated: DATAFU-154 explode array

This is an automated email from the ASF dual-hosted git repository.

eyal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/datafu.git


The following commit(s) were added to refs/heads/master by this push:
     new f79f300  DATAFU-154 explode array
f79f300 is described below

commit f79f300ee7b1a9dd5872410961fefdf2174134db
Author: Ran Yuchtman <ry...@paypal.com>
AuthorDate: Mon Nov 23 22:27:53 2020 +0200

    DATAFU-154 explode array
    
    Signed-off-by: Eyal Allweil <ey...@apache.org>
---
 .../src/main/scala/datafu/spark/DataFrameOps.scala |  4 +++
 .../src/main/scala/datafu/spark/SparkDFUtils.scala | 32 ++++++++++++++++++++++
 .../test/scala/datafu/spark/TestSparkDFUtils.scala | 23 ++++++++++++++++
 3 files changed, 59 insertions(+)

diff --git a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
index be4600d..9a30efc 100644
--- a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
@@ -98,5 +98,9 @@ object DataFrameOps {
                    numShards: Int = 1000,
                    joinType: String = "inner"): DataFrame =
       SparkDFUtils.joinSkewed(df, notSkewed, joinExprs, numShards, joinType)
+  
+    def explodeArray(arrayCol: Column,
+                     alias: String) =
+      SparkDFUtils.explodeArray(df, arrayCol, alias)  
   }
 }
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
index 47a8ee8..7853e16 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
@@ -507,4 +507,36 @@ object SparkDFUtils {
                    "decreased_single",
                    "range_size")
   }
+
+/** given an array column that you need to explode into different columns, use this method.
+   *
+   * @param df
+   * @param arrayCol
+   * @param alias
+   * @return
+   *
+   * input
+   * +-----+----------------------------------------+
+   * |label|sentence_arr                            |
+   * +-----+----------------------------------------+
+   * |0.0  |[Hi, I heard, about, Spark]             |
+   * |0.0  |[I wish, Java, could use, case classes] |
+   * |1.0  |[Logistic, regression, models, are neat]|
+   * +-----+----------------------------------------+
+   *
+   * output
+   * +-----+----------------------------------------+--------+----------+---------+------------+
+   * |label|sentence_arr                            |token0  |token1    |token2   |token3      |
+   * +-----+----------------------------------------+--------+----------+---------+------------+
+   * |0.0  |[Hi, I heard, about, Spark]             |Hi      |I heard   |about    |Spark       |
+   * |0.0  |[I wish, Java, could use, case classes] |I wish  |Java      |could use|case classes|
+   * |1.0  |[Logistic, regression, models, are neat]|Logistic|regression|models   |are neat    |
+   * +-----+----------------------------------------+--------+----------+---------+------------+
+   */
+  def explodeArray(df: DataFrame, arrayCol: Column, alias: String) = {
+    val arrSize = df.agg(max(size(arrayCol))).collect()(0).getInt(0)
+
+    val exprs = (0 until arrSize).map(i => arrayCol.getItem(i).alias(s"$alias$i"))
+    df.select((col("*") +: exprs):_*)
+  }
 }
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
index b148c35..91458ee 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
@@ -375,4 +375,27 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
 
     assertDataFrameEquals(expected, actual)
   }
+  
+  test("test_explode_array") {
+ 
+    val input = spark.createDataFrame(Seq(
+      (0.0, Seq("Hi", "I heard", "about", "Spark")),
+      (0.0, Seq("I wish", "Java", "could use", "case", "classes")),
+      (1.0, Seq("Logistic", "regression", "models", "are neat")),
+      (0.0, Seq()),
+      (1.0, null)
+    )).toDF("label", "sentence_arr")
+    
+    val actual = input.explodeArray($"sentence_arr", "token")
+    
+    val expected = spark.createDataFrame(Seq(
+      (0.0, Seq("Hi", "I heard", "about", "Spark"),"Hi", "I heard", "about", "Spark",null),
+      (0.0, Seq("I wish", "Java", "could use", "case", "classes"),"I wish", "Java", "could use", "case", "classes"),
+      (1.0, Seq("Logistic", "regression", "models", "are neat"),"Logistic", "regression", "models", "are neat",null),
+      (0.0, Seq(),null,null,null,null,null),
+      (1.0, null,null,null,null,null,null)
+    )).toDF("label", "sentence_arr","token0","token1","token2","token3","token4")
+    
+    assertDataFrameEquals(expected, actual)
+  }
 }