You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by ey...@apache.org on 2020/12/29 12:38:46 UTC
[datafu] branch master updated: DATAFU-154 explode array
This is an automated email from the ASF dual-hosted git repository.
eyal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/datafu.git
The following commit(s) were added to refs/heads/master by this push:
new f79f300 DATAFU-154 explode array
f79f300 is described below
commit f79f300ee7b1a9dd5872410961fefdf2174134db
Author: Ran Yuchtman <ry...@paypal.com>
AuthorDate: Mon Nov 23 22:27:53 2020 +0200
DATAFU-154 explode array
Signed-off-by: Eyal Allweil <ey...@apache.org>
---
.../src/main/scala/datafu/spark/DataFrameOps.scala | 4 +++
.../src/main/scala/datafu/spark/SparkDFUtils.scala | 32 ++++++++++++++++++++++
.../test/scala/datafu/spark/TestSparkDFUtils.scala | 23 ++++++++++++++++
3 files changed, 59 insertions(+)
diff --git a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
index be4600d..9a30efc 100644
--- a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
@@ -98,5 +98,9 @@ object DataFrameOps {
numShards: Int = 1000,
joinType: String = "inner"): DataFrame =
SparkDFUtils.joinSkewed(df, notSkewed, joinExprs, numShards, joinType)
+
+ def explodeArray(arrayCol: Column,
+ alias: String) =
+ SparkDFUtils.explodeArray(df, arrayCol, alias)
}
}
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
index 47a8ee8..7853e16 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
@@ -507,4 +507,36 @@ object SparkDFUtils {
"decreased_single",
"range_size")
}
+
+/** given an array column that you need to explode into different columns, use this method.
+ *
+ * @param df
+ * @param arrayCol
+ * @param alias
+ * @return
+ *
+ * input
+ * +-----+----------------------------------------+
+ * |label|sentence_arr |
+ * +-----+----------------------------------------+
+ * |0.0 |[Hi, I heard, about, Spark] |
+ * |0.0 |[I wish, Java, could use, case classes] |
+ * |1.0 |[Logistic, regression, models, are neat]|
+ * +-----+----------------------------------------+
+ *
+ * output
+ * +-----+----------------------------------------+--------+----------+---------+------------+
+ * |label|sentence_arr |token0 |token1 |token2 |token3 |
+ * +-----+----------------------------------------+--------+----------+---------+------------+
+ * |0.0 |[Hi, I heard, about, Spark] |Hi |I heard |about |Spark |
+ * |0.0 |[I wish, Java, could use, case classes] |I wish |Java |could use|case classes|
+ * |1.0 |[Logistic, regression, models, are neat]|Logistic|regression|models |are neat |
+ * +-----+----------------------------------------+--------+----------+---------+------------+
+ */
+ def explodeArray(df: DataFrame, arrayCol: Column, alias: String) = {
+ val arrSize = df.agg(max(size(arrayCol))).collect()(0).getInt(0)
+
+ val exprs = (0 until arrSize).map(i => arrayCol.getItem(i).alias(s"$alias$i"))
+ df.select((col("*") +: exprs):_*)
+ }
}
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
index b148c35..91458ee 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
@@ -375,4 +375,27 @@ class DataFrameOpsTests extends FunSuite with DataFrameSuiteBase {
assertDataFrameEquals(expected, actual)
}
+
+ test("test_explode_array") {
+
+ val input = spark.createDataFrame(Seq(
+ (0.0, Seq("Hi", "I heard", "about", "Spark")),
+ (0.0, Seq("I wish", "Java", "could use", "case", "classes")),
+ (1.0, Seq("Logistic", "regression", "models", "are neat")),
+ (0.0, Seq()),
+ (1.0, null)
+ )).toDF("label", "sentence_arr")
+
+ val actual = input.explodeArray($"sentence_arr", "token")
+
+ val expected = spark.createDataFrame(Seq(
+ (0.0, Seq("Hi", "I heard", "about", "Spark"),"Hi", "I heard", "about", "Spark",null),
+ (0.0, Seq("I wish", "Java", "could use", "case", "classes"),"I wish", "Java", "could use", "case", "classes"),
+ (1.0, Seq("Logistic", "regression", "models", "are neat"),"Logistic", "regression", "models", "are neat",null),
+ (0.0, Seq(),null,null,null,null,null),
+ (1.0, null,null,null,null,null,null)
+ )).toDF("label", "sentence_arr","token0","token1","token2","token3","token4")
+
+ assertDataFrameEquals(expected, actual)
+ }
}