You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/13 15:59:14 UTC
[spark] branch branch-3.0 updated: [SPARK-30762] Add dtype=float32
support to vector_to_array UDF
This is an automated email from the ASF dual-hosted git repository.
weichenxu123 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 074712e [SPARK-30762] Add dtype=float32 support to vector_to_array UDF
074712e is described below
commit 074712e329b347f769f8c009949c7845e95b3212
Author: Liang Zhang <li...@databricks.com>
AuthorDate: Thu Feb 13 23:55:13 2020 +0800
[SPARK-30762] Add dtype=float32 support to vector_to_array UDF
### What changes were proposed in this pull request?
In this PR, we add a parameter in the python function vector_to_array(col) that allows converting to a column of arrays of Float (32bits) in scala, which would be mapped to a numpy array of dtype=float32.
### Why are the changes needed?
In the downstream ML training, using float32 instead of float64 (default) would allow a larger batch size, i.e., allow more data to fit in the memory.
### Does this PR introduce any user-facing change?
Yes.
Old: `vector_to_array()` only take one param
```
df.select(vector_to_array("colA"), ...)
```
New: `vector_to_array()` can take an additional optional param: `dtype` = "float32" (or "float64")
```
df.select(vector_to_array("colA", "float32"), ...)
```
### How was this patch tested?
Unit test in scala.
doctest in python.
Closes #27522 from liangz1/udf-float32.
Authored-by: Liang Zhang <li...@databricks.com>
Signed-off-by: WeichenXu <we...@databricks.com>
(cherry picked from commit 82d0aa37ae521231d8067e473c6ea79a118a115a)
Signed-off-by: WeichenXu <we...@databricks.com>
---
.../main/scala/org/apache/spark/ml/functions.scala | 34 +++++++++++++++++++---
.../scala/org/apache/spark/ml/FunctionsSuite.scala | 33 ++++++++++++++++++---
python/pyspark/ml/functions.py | 27 +++++++++++++----
3 files changed, 81 insertions(+), 13 deletions(-)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/functions.scala b/mllib/src/main/scala/org/apache/spark/ml/functions.scala
index 1faf562..0f03231 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/functions.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/functions.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ml
import org.apache.spark.annotation.Since
-import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.linalg.{SparseVector, Vector}
import org.apache.spark.mllib.linalg.{Vector => OldVector}
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.udf
@@ -27,7 +27,6 @@ import org.apache.spark.sql.functions.udf
@Since("3.0.0")
object functions {
// scalastyle:on
-
private val vectorToArrayUdf = udf { vec: Any =>
vec match {
case v: Vector => v.toArray
@@ -39,10 +38,37 @@ object functions {
}
}.asNonNullable()
+ private val vectorToArrayFloatUdf = udf { vec: Any =>
+ vec match {
+ case v: SparseVector =>
+ val data = new Array[Float](v.size)
+ v.foreachActive { (index, value) => data(index) = value.toFloat }
+ data
+ case v: Vector => v.toArray.map(_.toFloat)
+ case v: OldVector => v.toArray.map(_.toFloat)
+ case v => throw new IllegalArgumentException(
+ "function vector_to_array requires a non-null input argument and input type must be " +
+ "`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " +
+ s"but got ${ if (v == null) "null" else v.getClass.getName }.")
+ }
+ }.asNonNullable()
+
/**
* Converts a column of MLlib sparse/dense vectors into a column of dense arrays.
- *
+ * @param v: the column of MLlib sparse/dense vectors
+ * @param dtype: the desired underlying data type in the returned array
+ * @return an array<float> if dtype is float32, or array<double> if dtype is float64
* @since 3.0.0
*/
- def vector_to_array(v: Column): Column = vectorToArrayUdf(v)
+ def vector_to_array(v: Column, dtype: String = "float64"): Column = {
+ if (dtype == "float64") {
+ vectorToArrayUdf(v)
+ } else if (dtype == "float32") {
+ vectorToArrayFloatUdf(v)
+ } else {
+ throw new IllegalArgumentException(
+ s"Unsupported dtype: $dtype. Valid values: float64, float32."
+ )
+ }
+ }
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
index 2f5062c..3dd9a7d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
@@ -34,9 +34,8 @@ class FunctionsSuite extends MLTest {
(Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0))))
).toDF("vec", "oldVec")
- val result = df.select(vector_to_array('vec), vector_to_array('oldVec))
- .as[(Seq[Double], Seq[Double])]
- .collect().toSeq
+ val result = df.select(vector_to_array('vec), vector_to_array('oldVec))
+ .as[(Seq[Double], Seq[Double])].collect().toSeq
val expected = Seq(
(Seq(1.0, 2.0, 3.0), Seq(10.0, 20.0, 30.0)),
@@ -50,7 +49,6 @@ class FunctionsSuite extends MLTest {
(null, null, 0)
).toDF("vec", "oldVec", "label")
-
for ((colName, valType) <- Seq(
("vec", "null"), ("oldVec", "null"), ("label", "java.lang.Integer"))) {
val thrown1 = intercept[SparkException] {
@@ -61,5 +59,32 @@ class FunctionsSuite extends MLTest {
"`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " +
s"but got ${valType}"))
}
+
+ val df3 = Seq(
+ (Vectors.dense(1.0, 2.0, 3.0), OldVectors.dense(10.0, 20.0, 30.0)),
+ (Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0))))
+ ).toDF("vec", "oldVec")
+ val dfArrayFloat = df3.select(
+ vector_to_array('vec, dtype = "float32"), vector_to_array('oldVec, dtype = "float32"))
+
+ // Check values are correct
+ val result3 = dfArrayFloat.as[(Seq[Float], Seq[Float])].collect().toSeq
+
+ val expected3 = Seq(
+ (Seq(1.0, 2.0, 3.0), Seq(10.0, 20.0, 30.0)),
+ (Seq(2.0, 0.0, 3.0), Seq(20.0, 0.0, 30.0))
+ )
+ assert(result3 === expected3)
+
+ // Check data types are correct
+ assert(dfArrayFloat.schema.simpleString ===
+ "struct<UDF(vec):array<float>,UDF(oldVec):array<float>>")
+
+ val thrown2 = intercept[IllegalArgumentException] {
+ df3.select(
+ vector_to_array('vec, dtype = "float16"), vector_to_array('oldVec, dtype = "float16"))
+ }
+ assert(thrown2.getMessage.contains(
+ s"Unsupported dtype: float16. Valid values: float64, float32."))
}
}
diff --git a/python/pyspark/ml/functions.py b/python/pyspark/ml/functions.py
index 2b4d8dd..ec164f3 100644
--- a/python/pyspark/ml/functions.py
+++ b/python/pyspark/ml/functions.py
@@ -19,10 +19,15 @@ from pyspark import since, SparkContext
from pyspark.sql.column import Column, _to_java_column
-@since(3.0)
-def vector_to_array(col):
+@since("3.0.0")
+def vector_to_array(col, dtype="float64"):
"""
Converts a column of MLlib sparse/dense vectors into a column of dense arrays.
+ :param col: A string of the column name or a Column
+ :param dtype: The data type of the output array. Valid values: "float64" or "float32".
+ :return: The converted column of dense arrays.
+
+ .. versionadded:: 3.0.0
>>> from pyspark.ml.linalg import Vectors
>>> from pyspark.ml.functions import vector_to_array
@@ -32,14 +37,26 @@ def vector_to_array(col):
... (Vectors.sparse(3, [(0, 2.0), (2, 3.0)]),
... OldVectors.sparse(3, [(0, 20.0), (2, 30.0)]))],
... ["vec", "oldVec"])
- >>> df.select(vector_to_array("vec").alias("vec"),
- ... vector_to_array("oldVec").alias("oldVec")).collect()
+ >>> df1 = df.select(vector_to_array("vec").alias("vec"),
+ ... vector_to_array("oldVec").alias("oldVec"))
+ >>> df1.collect()
+ [Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]),
+ Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])]
+ >>> df2 = df.select(vector_to_array("vec", "float32").alias("vec"),
+ ... vector_to_array("oldVec", "float32").alias("oldVec"))
+ >>> df2.collect()
[Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]),
Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])]
+ >>> df1.schema.fields
+ [StructField(vec,ArrayType(DoubleType,false),false),
+ StructField(oldVec,ArrayType(DoubleType,false),false)]
+ >>> df2.schema.fields
+ [StructField(vec,ArrayType(FloatType,false),false),
+ StructField(oldVec,ArrayType(FloatType,false),false)]
"""
sc = SparkContext._active_spark_context
return Column(
- sc._jvm.org.apache.spark.ml.functions.vector_to_array(_to_java_column(col)))
+ sc._jvm.org.apache.spark.ml.functions.vector_to_array(_to_java_column(col), dtype))
def _test():
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org